This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-22514 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 8cf13a59bd59fb231d23d9c1bea41cd3794eb24c Author: Duo Zhang <zhang...@apache.org> AuthorDate: Sat Aug 3 07:53:27 2019 +0800 HBASE-22695 Store the rsgroup of a table in table configuration (#426) Signed-off-by: Guanghao Zhang <zg...@apache.org> --- .../org/apache/hadoop/hbase/HTableDescriptor.java | 6 + .../hadoop/hbase/client/TableDescriptor.java | 8 + .../hbase/client/TableDescriptorBuilder.java | 19 ++ .../apache/hadoop/hbase/rsgroup/RSGroupInfo.java | 42 ++- .../org/apache/hadoop/hbase/master/HMaster.java | 4 +- .../apache/hadoop/hbase/master/LoadBalancer.java | 49 +--- .../hbase/master/assignment/AssignmentManager.java | 6 +- .../apache/hadoop/hbase/rsgroup/RSGroupAdmin.java | 23 -- .../hadoop/hbase/rsgroup/RSGroupAdminClient.java | 13 +- .../hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java | 152 ++++------ .../hadoop/hbase/rsgroup/RSGroupAdminServer.java | 315 ++++++--------------- .../hbase/rsgroup/RSGroupAdminServiceImpl.java | 111 ++++++-- .../hbase/rsgroup/RSGroupBasedLoadBalancer.java | 47 ++- .../hadoop/hbase/rsgroup/RSGroupInfoManager.java | 23 -- .../hbase/rsgroup/RSGroupInfoManagerImpl.java | 115 +------- .../apache/hadoop/hbase/rsgroup/RSGroupUtil.java | 113 ++++++++ .../hadoop/hbase/master/TestRegionPlacement2.java | 6 +- .../balancer/RSGroupableBalancerTestBase.java | 84 +++--- .../balancer/TestRSGroupBasedLoadBalancer.java | 42 ++- ...lancerWithStochasticLoadBalancerAsInternal.java | 4 +- .../hadoop/hbase/rsgroup/TestRSGroupsAdmin1.java | 1 - .../hadoop/hbase/rsgroup/TestRSGroupsAdmin2.java | 104 +------ .../hadoop/hbase/rsgroup/TestRSGroupsBalance.java | 20 +- .../hadoop/hbase/rsgroup/TestRSGroupsBase.java | 8 +- .../hbase/rsgroup/TestRSGroupsOfflineMode.java | 6 +- .../hbase/rsgroup/VerifyingRSGroupAdminClient.java | 67 +++-- 26 files changed, 590 insertions(+), 798 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 8866eba..188bed6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -987,4 +988,9 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr protected ModifyableTableDescriptor getDelegateeForModification() { return delegatee; } + + @Override + public Optional<String> getRegionServerGroup() { + return delegatee.getRegionServerGroup(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java index fc5e69e..a452387 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.Iterator; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Stream; import org.apache.hadoop.hbase.HConstants; @@ -184,6 +185,13 @@ public interface TableDescriptor { String getOwnerString(); /** + * Get the region server group this table belongs to. The regions of this table will be placed + * only on the region servers within this group. If not present, will be placed on + * {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo#DEFAULT_GROUP}. + */ + Optional<String> getRegionServerGroup(); + + /** * Getter for accessing the metadata associated with the key. * * @param key The key. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java index 20acf3a..3b68007 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; @@ -189,6 +190,9 @@ public class TableDescriptorBuilder { private static final Bytes PRIORITY_KEY = new Bytes(Bytes.toBytes(PRIORITY)); + private static final Bytes RSGROUP_KEY = + new Bytes(Bytes.toBytes(RSGroupInfo.TABLE_DESC_PROP_GROUP)); + /** * Relative priority of the table used for rpc scheduling */ @@ -538,6 +542,11 @@ public class TableDescriptorBuilder { return this; } + public TableDescriptorBuilder setRegionServerGroup(String group) { + desc.setValue(RSGROUP_KEY, new Bytes(Bytes.toBytes(group))); + return this; + } + public TableDescriptor build() { return new ModifyableTableDescriptor(desc); } @@ -1580,6 +1589,16 @@ public class TableDescriptorBuilder { public int getColumnFamilyCount() { return families.size(); } + + @Override + public Optional<String> getRegionServerGroup() { + Bytes value = values.get(RSGROUP_KEY); + if (value != null) { + return Optional.of(Bytes.toString(value.get(), value.getOffset(), value.getLength())); + } else { + return Optional.empty(); + } + } } private static Optional<CoprocessorDescriptor> toCoprocessorDescriptor(String spec) { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java index 25e827d..ad55d1f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java @@ -34,21 +34,38 @@ import org.apache.yetus.audience.InterfaceAudience; public class RSGroupInfo { public static final String DEFAULT_GROUP = "default"; public static final String NAMESPACE_DESC_PROP_GROUP = "hbase.rsgroup.name"; + public static final String TABLE_DESC_PROP_GROUP = "hbase.rsgroup.name"; private final String name; // Keep servers in a sorted set so has an expected ordering when displayed. private final SortedSet<Address> servers; // Keep tables sorted too. + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0. The rsgroup information will be stored in + * the configuration of a table so this will be removed. + */ + @Deprecated private final SortedSet<TableName> tables; public RSGroupInfo(String name) { this(name, new TreeSet<Address>(), new TreeSet<TableName>()); } + RSGroupInfo(String name, SortedSet<Address> servers) { + this.name = name; + this.servers = servers == null ? new TreeSet<>() : new TreeSet<>(servers); + this.tables = new TreeSet<>(); + } + + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0. The rsgroup information for a table will be + * stored in the configuration of a table so this will be removed. + */ + @Deprecated RSGroupInfo(String name, SortedSet<Address> servers, SortedSet<TableName> tables) { this.name = name; this.servers = (servers == null) ? new TreeSet<>() : new TreeSet<>(servers); - this.tables = (tables == null) ? new TreeSet<>() : new TreeSet<>(tables); + this.tables = (tables == null) ? new TreeSet<>() : new TreeSet<>(tables); } public RSGroupInfo(RSGroupInfo src) { @@ -100,23 +117,46 @@ public class RSGroupInfo { /** * Get set of tables that are members of the group. + * @deprecated Since 3.0.0, will be removed in 4.0.0. The rsgroup information will be stored in + * the configuration of a table so this will be removed. */ + @Deprecated public SortedSet<TableName> getTables() { return tables; } + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0. The rsgroup information will be stored in + * the configuration of a table so this will be removed. + */ + @Deprecated public void addTable(TableName table) { tables.add(table); } + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0. The rsgroup information will be stored in + * the configuration of a table so this will be removed. + */ + @Deprecated public void addAllTables(Collection<TableName> arg) { tables.addAll(arg); } + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0. The rsgroup information will be stored in + * the configuration of a table so this will be removed. + */ + @Deprecated public boolean containsTable(TableName table) { return tables.contains(table); } + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0. The rsgroup information will be stored in + * the configuration of a table so this will be removed. + */ + @Deprecated public boolean removeTable(TableName table) { return tables.remove(table); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 6864ce6..44a5b65 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1982,7 +1982,7 @@ public class HMaster extends HRegionServer implements MasterServices { // Replace with an async implementation from which you can get // a success/failure result. @VisibleForTesting - public void move(final byte[] encodedRegionName, byte[] destServerName) throws HBaseIOException { + public void move(final byte[] encodedRegionName, byte[] destServerName) throws IOException { RegionState regionState = assignmentManager.getRegionStates(). getRegionState(Bytes.toString(encodedRegionName)); @@ -3594,7 +3594,7 @@ public class HMaster extends HRegionServer implements MasterServices { * @param servers Region servers to decommission. */ public void decommissionRegionServers(final List<ServerName> servers, final boolean offload) - throws HBaseIOException { + throws IOException { List<ServerName> serversAdded = new ArrayList<>(servers.size()); // Place the decommission marker first. String parentZnode = getZooKeeper().getZNodePaths().drainingZNode; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java index 816636f..0fc544a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java @@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.master; import edu.umd.cs.findbugs.annotations.Nullable; +import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; -import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; @@ -65,95 +65,72 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse ServerName BOGUS_SERVER_NAME = ServerName.valueOf("localhost,1,1"); /** - * Set the current cluster status. This allows a LoadBalancer to map host name to a server - * @param st + * Set the current cluster status. This allows a LoadBalancer to map host name to a server */ void setClusterMetrics(ClusterMetrics st); /** * Pass RegionStates and allow balancer to set the current cluster load. - * @param ClusterLoad */ void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> ClusterLoad); /** * Set the master service. - * @param masterServices */ void setMasterServices(MasterServices masterServices); /** * Perform the major balance operation - * @param tableName - * @param clusterState * @return List of plans */ - List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, - List<RegionInfo>> clusterState) throws HBaseIOException; + List<RegionPlan> balanceCluster(TableName tableName, + Map<ServerName, List<RegionInfo>> clusterState) throws IOException; /** * Perform the major balance operation - * @param clusterState * @return List of plans */ - List<RegionPlan> balanceCluster(Map<ServerName, - List<RegionInfo>> clusterState) throws HBaseIOException; + List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState) + throws IOException; /** * Perform a Round Robin assignment of regions. - * @param regions - * @param servers * @return Map of servername to regioninfos */ - Map<ServerName, List<RegionInfo>> roundRobinAssignment( - List<RegionInfo> regions, - List<ServerName> servers - ) throws HBaseIOException; + Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions, + List<ServerName> servers) throws IOException; /** * Assign regions to the previously hosting region server - * @param regions - * @param servers * @return List of plans */ @Nullable - Map<ServerName, List<RegionInfo>> retainAssignment( - Map<RegionInfo, ServerName> regions, - List<ServerName> servers - ) throws HBaseIOException; + Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions, + List<ServerName> servers) throws IOException; /** * Get a random region server from the list * @param regionInfo Region for which this selection is being done. - * @param servers - * @return Servername */ - ServerName randomAssignment( - RegionInfo regionInfo, List<ServerName> servers - ) throws HBaseIOException; + ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers) throws IOException; /** * Initialize the load balancer. Must be called after setters. - * @throws HBaseIOException */ - void initialize() throws HBaseIOException; + void initialize() throws IOException; /** * Marks the region as online at balancer. - * @param regionInfo - * @param sn */ void regionOnline(RegionInfo regionInfo, ServerName sn); /** * Marks the region as offline at balancer. - * @param regionInfo */ void regionOffline(RegionInfo regionInfo); - /* + /** * Notification that config has changed - * @param conf */ @Override void onConfigurationChange(Configuration conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index efd144d..d375ac5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -686,7 +686,7 @@ public class AssignmentManager { this.master.getServerManager().createDestinationServersList(serversToExclude)); // Return mid-method! return createAssignProcedures(assignments); - } catch (HBaseIOException hioe) { + } catch (IOException hioe) { LOG.warn("Failed roundRobinAssignment", hioe); } // If an error above, fall-through to this simpler assign. Last resort. @@ -2044,7 +2044,7 @@ public class AssignmentManager { } try { acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); - } catch (HBaseIOException e) { + } catch (IOException e) { LOG.warn("unable to retain assignment", e); addToPendingAssignment(regions, retainMap.keySet()); } @@ -2059,7 +2059,7 @@ public class AssignmentManager { } try { acceptPlan(regions, balancer.roundRobinAssignment(hris, servers)); - } catch (HBaseIOException e) { + } catch (IOException e) { LOG.warn("unable to round-robin assignment", e); addToPendingAssignment(regions, hris); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdmin.java index 9ea996b..344d0b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdmin.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdmin.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.rsgroup; import java.io.IOException; import java.util.List; import java.util.Set; - -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.net.Address; import org.apache.yetus.audience.InterfaceAudience; @@ -36,22 +34,11 @@ public interface RSGroupAdmin { RSGroupInfo getRSGroupInfo(String groupName) throws IOException; /** - * Gets {@code RSGroupInfo} for the given table's group. - */ - RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException; - - /** * Move given set of servers to the specified target RegionServer group. */ void moveServers(Set<Address> servers, String targetGroup) throws IOException; /** - * Move given set of tables to the specified target RegionServer group. - * This will unassign all of a table's region so it can be reassigned to the correct group. - */ - void moveTables(Set<TableName> tables, String targetGroup) throws IOException; - - /** * Creates a new RegionServer group with the given name. */ void addRSGroup(String groupName) throws IOException; @@ -80,16 +67,6 @@ public interface RSGroupAdmin { RSGroupInfo getRSGroupOfServer(Address hostPort) throws IOException; /** - * Move given set of servers and tables to the specified target RegionServer group. - * @param servers set of servers to move - * @param tables set of tables to move - * @param targetGroup the target group name - * @throws IOException if moving the server and tables fail - */ - void moveServersAndTables(Set<Address> servers, Set<TableName> tables, - String targetGroup) throws IOException; - - /** * Remove decommissioned servers from rsgroup. * 1. Sometimes we may find the server aborted due to some hardware failure and we must offline * the server for repairing. Or we need to move some servers to join other clusters. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java index e7ab7f2..07f0efd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminClient.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveServe import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Sets; /** @@ -62,12 +63,17 @@ public class RSGroupAdminClient implements RSGroupAdmin { stub = RSGroupAdminService.newBlockingStub(admin.coprocessorService()); } + // for writing UTs + @VisibleForTesting + protected RSGroupAdminClient() { + } + @Override public RSGroupInfo getRSGroupInfo(String groupName) throws IOException { try { GetRSGroupInfoResponse resp = stub.getRSGroupInfo(null, - GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build()); - if(resp.hasRSGroupInfo()) { + GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build()); + if (resp.hasRSGroupInfo()) { return ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()); } return null; @@ -76,7 +82,6 @@ public class RSGroupAdminClient implements RSGroupAdmin { } } - @Override public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException { GetRSGroupInfoOfTableRequest request = GetRSGroupInfoOfTableRequest.newBuilder().setTableName( ProtobufUtil.toProtoTableName(tableName)).build(); @@ -111,7 +116,6 @@ public class RSGroupAdminClient implements RSGroupAdmin { } } - @Override public void moveTables(Set<TableName> tables, String targetGroup) throws IOException { MoveTablesRequest.Builder builder = MoveTablesRequest.newBuilder().setTargetGroup(targetGroup); for(TableName tableName: tables) { @@ -192,7 +196,6 @@ public class RSGroupAdminClient implements RSGroupAdmin { } } - @Override public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException { MoveServersAndTablesRequest.Builder builder = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java index 2d5af04..3c4530f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java @@ -27,13 +27,10 @@ import java.util.stream.Collectors; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.NamespaceDescriptor; -import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; @@ -47,21 +44,16 @@ import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.collect.Sets; // TODO: Encapsulate MasterObserver functions into separate subclass. @CoreCoprocessor @InterfaceAudience.Private public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { - static final Logger LOG = LoggerFactory.getLogger(RSGroupAdminEndpoint.class); - - private MasterServices master; // Only instance of RSGroupInfoManager. RSGroup aware load balancers ask for this instance on // their setup. + private MasterServices master; private RSGroupInfoManager groupInfoManager; private RSGroupAdminServer groupAdminServer; private RSGroupAdminServiceImpl groupAdminService = new RSGroupAdminServiceImpl(); @@ -110,117 +102,91 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { return groupAdminService; } - private void assignTableToGroup(TableDescriptor desc) throws IOException { - String groupName = - master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString()) - .getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); - if (groupName == null) { - groupName = RSGroupInfo.DEFAULT_GROUP; - } - RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName); - if (rsGroupInfo == null) { - throw new ConstraintException( - "Default RSGroup (" + groupName + ") for this table's namespace does not exist."); - } - if (!rsGroupInfo.containsTable(desc.getTableName())) { - LOG.debug("Pre-moving table " + desc.getTableName() + " to RSGroup " + groupName); - groupAdminServer.moveTables(Sets.newHashSet(desc.getTableName()), groupName); - } - } - ///////////////////////////////////////////////////////////////////////////// // MasterObserver overrides ///////////////////////////////////////////////////////////////////////////// - private boolean rsgroupHasServersOnline(TableDescriptor desc) throws IOException { - String groupName; - try { - groupName = master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString()) - .getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); - if (groupName == null) { - groupName = RSGroupInfo.DEFAULT_GROUP; - } - } catch (MasterNotRunningException | PleaseHoldException e) { - LOG.info("Master has not initialized yet; temporarily using default RSGroup '" + - RSGroupInfo.DEFAULT_GROUP + "' for deploy of system table"); - groupName = RSGroupInfo.DEFAULT_GROUP; + @Override + public void postClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx, + List<ServerName> servers, List<ServerName> notClearedServers) throws IOException { + Set<Address> clearedServer = + servers.stream().filter(server -> !notClearedServers.contains(server)) + .map(ServerName::getAddress).collect(Collectors.toSet()); + if (!clearedServer.isEmpty()) { + groupAdminServer.removeServers(clearedServer); } + } - RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName); - if (rsGroupInfo == null) { - throw new ConstraintException( - "Default RSGroup (" + groupName + ") for this table's " + "namespace does not exist."); + private void checkGroupExists(Optional<String> optGroupName) throws IOException { + if (optGroupName.isPresent()) { + String groupName = optGroupName.get(); + if (groupAdminServer.getRSGroupInfo(groupName) == null) { + throw new ConstraintException("Region server group " + groupName + " does not exit"); + } } + } - for (ServerName onlineServer : master.getServerManager().createDestinationServersList()) { - if (rsGroupInfo.getServers().contains(onlineServer.getAddress())) { + private boolean rsgroupHasServersOnline(TableDescriptor desc) throws IOException { + RSGroupInfo rsGroupInfo; + Optional<String> optGroupName = desc.getRegionServerGroup(); + if (optGroupName.isPresent()) { + String groupName = optGroupName.get(); + if (groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { + // do not check for default group + return true; + } + rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName); + if (rsGroupInfo == null) { + throw new ConstraintException( + "RSGroup " + groupName + " for table " + desc.getTableName() + " does not exist"); + } + } else { + NamespaceDescriptor nd = + master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString()); + String groupNameOfNs = nd.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); + if (groupNameOfNs == null || groupNameOfNs.equals(RSGroupInfo.DEFAULT_GROUP)) { + // do not check for default group return true; } + rsGroupInfo = groupAdminServer.getRSGroupInfo(groupNameOfNs); + if (rsGroupInfo == null) { + throw new ConstraintException("RSGroup " + groupNameOfNs + " for table " + + desc.getTableName() + "(inherit from namespace) does not exist"); + } } - return false; + return master.getServerManager().createDestinationServersList().stream() + .anyMatch(onlineServer -> rsGroupInfo.containsServer(onlineServer.getAddress())); } @Override - public void preCreateTableAction(final ObserverContext<MasterCoprocessorEnvironment> ctx, - final TableDescriptor desc, final RegionInfo[] regions) throws IOException { + public void preCreateTableAction(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableDescriptor desc, RegionInfo[] regions) throws IOException { + checkGroupExists(desc.getRegionServerGroup()); if (!desc.getTableName().isSystemTable() && !rsgroupHasServersOnline(desc)) { - throw new HBaseIOException("No online servers in the rsgroup, which table " + - desc.getTableName().getNameAsString() + " belongs to"); + throw new HBaseIOException("No online servers in the rsgroup for " + desc); } } - // Assign table to default RSGroup. - @Override - public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx, - TableDescriptor desc, RegionInfo[] regions) throws IOException { - assignTableToGroup(desc); - } - - // Remove table from its RSGroup. @Override - public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx, - TableName tableName) throws IOException { - try { - RSGroupInfo group = groupAdminServer.getRSGroupInfoOfTable(tableName); - if (group != null) { - LOG.debug(String.format("Removing deleted table '%s' from rsgroup '%s'", tableName, - group.getName())); - groupAdminServer.moveTables(Sets.newHashSet(tableName), null); - } - } catch (IOException ex) { - LOG.debug("Failed to perform RSGroup information cleanup for table: " + tableName, ex); - } + public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName, TableDescriptor currentDescriptor, TableDescriptor newDescriptor) + throws IOException { + checkGroupExists(newDescriptor.getRegionServerGroup()); + return MasterObserver.super.preModifyTable(ctx, tableName, currentDescriptor, newDescriptor); } @Override public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, NamespaceDescriptor ns) throws IOException { - String group = ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); - if (group != null && groupAdminServer.getRSGroupInfo(group) == null) { - throw new ConstraintException("Region server group " + group + " does not exit"); - } + checkGroupExists( + Optional.ofNullable(ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP))); } @Override public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, - NamespaceDescriptor currentNsDesc, NamespaceDescriptor newNsDesc) throws IOException { - preCreateNamespace(ctx, newNsDesc); - } - - @Override - public void preCloneSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx, - SnapshotDescription snapshot, TableDescriptor desc) throws IOException { - assignTableToGroup(desc); - } - - @Override - public void postClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx, - List<ServerName> servers, List<ServerName> notClearedServers) throws IOException { - Set<Address> clearedServer = - servers.stream().filter(server -> !notClearedServers.contains(server)) - .map(ServerName::getAddress).collect(Collectors.toSet()); - if (!clearedServer.isEmpty()) { - groupAdminServer.removeServers(clearedServer); - } + NamespaceDescriptor currentNsDescriptor, NamespaceDescriptor newNsDescriptor) + throws IOException { + checkGroupExists(Optional + .ofNullable(newNsDescriptor.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP))); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java index e6bf69e..59950e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java @@ -26,15 +26,16 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Function; - import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.LoadBalancer; @@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.ServerManager; -import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.RegionStateNode; import org.apache.hadoop.hbase.net.Address; import org.apache.yetus.audience.InterfaceAudience; @@ -85,14 +85,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { return rsGroupInfoManager.getRSGroup(groupName); } - @Override - public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException { - // We are reading across two Maps in the below with out synchronizing across - // them; should be safe most of the time. - String groupName = rsGroupInfoManager.getRSGroupOfTable(tableName); - return groupName == null? null: rsGroupInfoManager.getRSGroup(groupName); - } - private void checkOnlineServersOnly(Set<Address> servers) throws ConstraintException { // This uglyness is because we only have Address, not ServerName. // Online servers are keyed by ServerName. @@ -159,104 +151,24 @@ public class RSGroupAdminServer implements RSGroupAdmin { } /** - * Check servers and tables. - * - * @param servers servers to move - * @param tables tables to move - * @param targetGroupName target group name - * @throws IOException if nulls or if servers and tables not belong to the same group - */ - private void checkServersAndTables(Set<Address> servers, Set<TableName> tables, - String targetGroupName) throws IOException { - // Presume first server's source group. Later ensure all servers are from this group. - Address firstServer = servers.iterator().next(); - RSGroupInfo tmpSrcGrp = rsGroupInfoManager.getRSGroupOfServer(firstServer); - if (tmpSrcGrp == null) { - // Be careful. This exception message is tested for in TestRSGroupsBase... - throw new ConstraintException("Source RSGroup for server " + firstServer - + " does not exist."); - } - RSGroupInfo srcGrp = new RSGroupInfo(tmpSrcGrp); - - // Only move online servers - checkOnlineServersOnly(servers); - - // Ensure all servers are of same rsgroup. - for (Address server: servers) { - String tmpGroup = rsGroupInfoManager.getRSGroupOfServer(server).getName(); - if (!tmpGroup.equals(srcGrp.getName())) { - throw new ConstraintException("Move server request should only come from one source " + - "RSGroup. Expecting only " + srcGrp.getName() + " but contains " + tmpGroup); - } - } - - // Ensure all tables and servers are of same rsgroup. - for (TableName table : tables) { - String tmpGroup = rsGroupInfoManager.getRSGroupOfTable(table); - if (!tmpGroup.equals(srcGrp.getName())) { - throw new ConstraintException("Move table request should only come from one source " + - "RSGroup. Expecting only " + srcGrp.getName() + " but contains " + tmpGroup); - } - } - - if (srcGrp.getServers().size() <= servers.size() && srcGrp.getTables().size() > tables.size()) { - throw new ConstraintException("Cannot leave a RSGroup " + srcGrp.getName() + - " that contains tables without servers to host them."); - } - } - - /** - * Move every region from servers which are currently located on these servers, - * but should not be located there. - * + * Move every region from servers which are currently located on these servers, but should not be + * located there. * @param servers the servers that will move to new group * @param targetGroupName the target group name * @throws IOException if moving the server and tables fail */ private void moveServerRegionsFromGroup(Set<Address> servers, String targetGroupName) throws IOException { - moveRegionsBetweenGroups(servers, targetGroupName, - rs -> getRegions(rs), - info -> { - try { - RSGroupInfo group = getRSGroupInfo(targetGroupName); - return group.containsTable(info.getTable()); - } catch (IOException e) { - e.printStackTrace(); - return false; - } - }, - rs -> rs.getHostname()); - } - - /** - * Moves regions of tables which are not on target group servers. - * - * @param tables the tables that will move to new group - * @param targetGroupName the target group name - * @throws IOException if moving the region fails - */ - private void moveTableRegionsToGroup(Set<TableName> tables, String targetGroupName) - throws IOException { - moveRegionsBetweenGroups(tables, targetGroupName, - table -> { - if (master.getAssignmentManager().isTableDisabled(table)) { - return new ArrayList<>(); - } - return master.getAssignmentManager().getRegionStates().getRegionsOfTable(table); - }, - info -> { - try { - RSGroupInfo group = getRSGroupInfo(targetGroupName); - ServerName sn = - master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(info); - return group.containsServer(sn.getAddress()); - } catch (IOException e) { - e.printStackTrace(); - return false; - } - }, - table -> table.getNameWithNamespaceInclAsString()); + moveRegionsBetweenGroups(servers, targetGroupName, rs -> getRegions(rs), info -> { + try { + String groupName = RSGroupUtil.getRSGroupInfo(master, rsGroupInfoManager, info.getTable()) + .map(RSGroupInfo::getName).orElse(RSGroupInfo.DEFAULT_GROUP); + return groupName.equals(targetGroupName); + } catch (IOException e) { + LOG.warn("Failed to test group for region {} and target group {}", info, targetGroupName); + return false; + } + }, rs -> rs.getHostname()); } private <T> void moveRegionsBetweenGroups(Set<T> regionsOwners, String targetGroupName, @@ -321,9 +233,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { } } - @edu.umd.cs.findbugs.annotations.SuppressWarnings( - value="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE", - justification="Ignoring complaint because don't know what it is complaining about") @Override public void moveServers(Set<Address> servers, String targetGroupName) throws IOException { if (servers == null) { @@ -364,9 +273,16 @@ public class RSGroupAdminServer implements RSGroupAdmin { "RSGroup. Expecting only " + srcGrp.getName() + " but contains " + tmpGroup); } } - if (srcGrp.getServers().size() <= servers.size() && srcGrp.getTables().size() > 0) { - throw new ConstraintException("Cannot leave a RSGroup " + srcGrp.getName() + - " that contains tables without servers to host them."); + if (srcGrp.getServers().size() <= servers.size()) { + // check if there are still tables reference this group + for (TableDescriptor td : master.getTableDescriptors().getAll().values()) { + Optional<String> optGroupName = td.getRegionServerGroup(); + if (optGroupName.isPresent() && optGroupName.get().equals(srcGrp.getName())) { + throw new ConstraintException( + "Cannot leave a RSGroup " + srcGrp.getName() + " that contains tables('" + + td.getTableName() + "' at least) without servers to host them."); + } + } } // MovedServers may be < passed in 'servers'. @@ -378,38 +294,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { } @Override - public void moveTables(Set<TableName> tables, String targetGroup) throws IOException { - if (tables == null) { - throw new ConstraintException("The list of servers cannot be null."); - } - if (tables.size() < 1) { - LOG.debug("moveTables() passed an empty set. Ignoring."); - return; - } - - // Hold a lock on the manager instance while moving servers to prevent - // another writer changing our state while we are working. - synchronized (rsGroupInfoManager) { - if(targetGroup != null) { - RSGroupInfo destGroup = rsGroupInfoManager.getRSGroup(targetGroup); - if(destGroup == null) { - throw new ConstraintException("Target " + targetGroup + " RSGroup does not exist."); - } - if(destGroup.getServers().size() < 1) { - throw new ConstraintException("Target RSGroup must have at least one server."); - } - } - rsGroupInfoManager.moveTables(tables, targetGroup); - - // targetGroup is null when a table is being deleted. In this case no further - // action is required. - if (targetGroup != null) { - moveTableRegionsToGroup(tables, targetGroup); - } - } - } - - @Override public void addRSGroup(String name) throws IOException { rsGroupInfoManager.addRSGroup(new RSGroupInfo(name)); } @@ -423,17 +307,18 @@ public class RSGroupAdminServer implements RSGroupAdmin { if (rsGroupInfo == null) { throw new ConstraintException("RSGroup " + name + " does not exist"); } - int tableCount = rsGroupInfo.getTables().size(); - if (tableCount > 0) { - throw new ConstraintException("RSGroup " + name + " has " + tableCount + - " tables; you must remove these tables from the rsgroup before " + - "the rsgroup can be removed."); - } int serverCount = rsGroupInfo.getServers().size(); if (serverCount > 0) { throw new ConstraintException("RSGroup " + name + " has " + serverCount + - " servers; you must remove these servers from the RSGroup before" + - "the RSGroup can be removed."); + " servers; you must remove these servers from the RSGroup before" + + " the RSGroup can be removed."); + } + for (TableDescriptor td : master.getTableDescriptors().getAll().values()) { + if (td.getRegionServerGroup().map(name::equals).orElse(false)) { + throw new ConstraintException("RSGroup " + name + " is already referenced by " + + td.getTableName() + "; you must remove all the tables from the rsgroup before " + + "the rsgroup can be removed."); + } } for (NamespaceDescriptor ns : master.getClusterSchema().getNamespaces()) { String nsGroup = ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); @@ -458,27 +343,29 @@ public class RSGroupAdminServer implements RSGroupAdmin { } if (getRSGroupInfo(groupName) == null) { - throw new ConstraintException("RSGroup does not exist: "+groupName); + throw new ConstraintException("RSGroup does not exist: " + groupName); } // Only allow one balance run at at time. Map<String, RegionState> groupRIT = rsGroupGetRegionsInTransition(groupName); if (groupRIT.size() > 0) { LOG.debug("Not running balancer because {} region(s) in transition: {}", groupRIT.size(), - StringUtils.abbreviate( - master.getAssignmentManager().getRegionStates().getRegionsInTransition().toString(), - 256)); + StringUtils.abbreviate( + master.getAssignmentManager().getRegionStates().getRegionsInTransition().toString(), + 256)); return false; } if (serverManager.areDeadServersInProgress()) { LOG.debug("Not running balancer because processing dead regionserver(s): {}", - serverManager.getDeadServers()); + serverManager.getDeadServers()); return false; } - //We balance per group instead of per table + // We balance per group instead of per table List<RegionPlan> plans = new ArrayList<>(); - for(Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> tableMap: - getRSGroupAssignmentsByTable(groupName).entrySet()) { + Map<TableName, Map<ServerName, List<RegionInfo>>> assignmentsByTable = + getRSGroupAssignmentsByTable(groupName); + for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> tableMap : assignmentsByTable + .entrySet()) { LOG.info("Creating partial plan for table {} : {}", tableMap.getKey(), tableMap.getValue()); List<RegionPlan> partialPlans = balancer.balanceCluster(tableMap.getValue()); LOG.info("Partial plan for table {} : {}", tableMap.getKey(), partialPlans); @@ -507,104 +394,66 @@ public class RSGroupAdminServer implements RSGroupAdmin { } @Override - public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String targetGroup) - throws IOException { + public void removeServers(Set<Address> servers) throws IOException { if (servers == null || servers.isEmpty()) { - throw new ConstraintException("The list of servers to move cannot be null or empty."); - } - if (tables == null || tables.isEmpty()) { - throw new ConstraintException("The list of tables to move cannot be null or empty."); + throw new ConstraintException("The set of servers to remove cannot be null or empty."); } - - //check target group - getAndCheckRSGroupInfo(targetGroup); - - // Hold a lock on the manager instance while moving servers and tables to prevent + // Hold a lock on the manager instance while moving servers to prevent // another writer changing our state while we are working. synchronized (rsGroupInfoManager) { - //check servers and tables status - checkServersAndTables(servers, tables, targetGroup); - - //Move servers and tables to a new group. - String srcGroup = getRSGroupOfServer(servers.iterator().next()).getName(); - rsGroupInfoManager.moveServersAndTables(servers, tables, srcGroup, targetGroup); - - //move regions on these servers which do not belong to group tables - moveServerRegionsFromGroup(servers, targetGroup); - //move regions of these tables which are not on group servers - moveTableRegionsToGroup(tables, targetGroup); + // check the set of servers + checkForDeadOrOnlineServers(servers); + rsGroupInfoManager.removeServers(servers); + LOG.info("Remove decommissioned servers {} from RSGroup done", servers); } - LOG.info("Move servers and tables done. Severs: {}, Tables: {} => {}", servers, tables, - targetGroup); } - @Override - public void removeServers(Set<Address> servers) throws IOException { - { - if (servers == null || servers.isEmpty()) { - throw new ConstraintException("The set of servers to remove cannot be null or empty."); - } - // Hold a lock on the manager instance while moving servers to prevent - // another writer changing our state while we are working. - synchronized (rsGroupInfoManager) { - //check the set of servers - checkForDeadOrOnlineServers(servers); - rsGroupInfoManager.removeServers(servers); - LOG.info("Remove decommissioned servers {} from RSGroup done", servers); - } + private boolean isTableInGroup(TableName tableName, String groupName, + Set<TableName> tablesInGroupCache) throws IOException { + if (tablesInGroupCache.contains(tableName)) { + return true; } + if (RSGroupUtil.getRSGroupInfo(master, rsGroupInfoManager, tableName).map(RSGroupInfo::getName) + .orElse(RSGroupInfo.DEFAULT_GROUP).equals(groupName)) { + tablesInGroupCache.add(tableName); + return true; + } + return false; } private Map<String, RegionState> rsGroupGetRegionsInTransition(String groupName) - throws IOException { + throws IOException { Map<String, RegionState> rit = Maps.newTreeMap(); - AssignmentManager am = master.getAssignmentManager(); - for(TableName tableName : getRSGroupInfo(groupName).getTables()) { - for(RegionInfo regionInfo: am.getRegionStates().getRegionsOfTable(tableName)) { - RegionState state = am.getRegionStates().getRegionTransitionState(regionInfo); - if(state != null) { - rit.put(regionInfo.getEncodedName(), state); - } + Set<TableName> tablesInGroupCache = new HashSet<>(); + for (RegionStateNode regionNode : master.getAssignmentManager().getRegionsInTransition()) { + TableName tn = regionNode.getTable(); + if (isTableInGroup(tn, groupName, tablesInGroupCache)) { + rit.put(regionNode.getRegionInfo().getEncodedName(), regionNode.toRegionState()); } } return rit; } private Map<TableName, Map<ServerName, List<RegionInfo>>> - getRSGroupAssignmentsByTable(String groupName) throws IOException { + getRSGroupAssignmentsByTable(String groupName) throws IOException { Map<TableName, Map<ServerName, List<RegionInfo>>> result = Maps.newHashMap(); - RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName); - Map<TableName, Map<ServerName, List<RegionInfo>>> assignments = Maps.newHashMap(); - for(Map.Entry<RegionInfo, ServerName> entry: - master.getAssignmentManager().getRegionStates().getRegionAssignments().entrySet()) { - TableName currTable = entry.getKey().getTable(); - ServerName currServer = entry.getValue(); - RegionInfo currRegion = entry.getKey(); - if (rsGroupInfo.getTables().contains(currTable)) { - assignments.computeIfAbsent(currTable, key -> new HashMap<>()) - .computeIfAbsent(currServer, key -> new ArrayList<>()) - .add(currRegion); + Set<TableName> tablesInGroupCache = new HashSet<>(); + for (Map.Entry<RegionInfo, ServerName> entry : master.getAssignmentManager().getRegionStates() + .getRegionAssignments().entrySet()) { + RegionInfo region = entry.getKey(); + TableName tn = region.getTable(); + ServerName server = entry.getValue(); + if (isTableInGroup(tn, groupName, tablesInGroupCache)) { + result.computeIfAbsent(tn, k -> new HashMap<>()) + .computeIfAbsent(server, k -> new ArrayList<>()).add(region); } } - - Map<ServerName, List<RegionInfo>> serverMap = Maps.newHashMap(); - for(ServerName serverName: master.getServerManager().getOnlineServers().keySet()) { - if(rsGroupInfo.getServers().contains(serverName.getAddress())) { - serverMap.put(serverName, Collections.emptyList()); - } - } - - // add all tables that are members of the group - for (TableName tableName : rsGroupInfo.getTables()) { - if (assignments.containsKey(tableName)) { - Map<ServerName, List<RegionInfo>> tableResults = new HashMap<>(serverMap); - - Map<ServerName, List<RegionInfo>> tableAssignments = assignments.get(tableName); - tableResults.putAll(tableAssignments); - - result.put(tableName, tableResults); - - LOG.debug("Adding assignments for {}: {}", tableName, tableAssignments); + RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName); + for (ServerName serverName : master.getServerManager().getOnlineServers().keySet()) { + if (rsGroupInfo.containsServer(serverName.getAddress())) { + for (Map<ServerName, List<RegionInfo>> map : result.values()) { + map.computeIfAbsent(serverName, k -> Collections.emptyList()); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServiceImpl.java index 918a4fe..6bc4519 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServiceImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServiceImpl.java @@ -20,14 +20,24 @@ package org.apache.hadoop.hbase.rsgroup; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos; @@ -57,6 +67,8 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.Permission.Action; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Sets; @@ -68,6 +80,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Sets; */ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { + private static final Logger LOG = LoggerFactory.getLogger(RSGroupAdminServiceImpl.class); + private MasterServices master; private RSGroupAdminServer groupAdminServer; @@ -107,12 +121,17 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { return userProvider.getCurrent(); } + // for backward compatible + private RSGroupInfo fillTables(RSGroupInfo rsGroupInfo) throws IOException { + return RSGroupUtil.fillTables(rsGroupInfo, master.getTableDescriptors().getAll().values()); + } + @Override public void getRSGroupInfo(RpcController controller, GetRSGroupInfoRequest request, RpcCallback<GetRSGroupInfoResponse> done) { GetRSGroupInfoResponse.Builder builder = GetRSGroupInfoResponse.newBuilder(); String groupName = request.getRSGroupName(); - RSGroupAdminEndpoint.LOG.info( + LOG.info( master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, group=" + groupName); try { if (master.getMasterCoprocessorHost() != null) { @@ -121,7 +140,7 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { checkPermission("getRSGroupInfo"); RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName); if (rsGroupInfo != null) { - builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(rsGroupInfo)); + builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(fillTables(rsGroupInfo))); } if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postGetRSGroupInfo(groupName); @@ -137,17 +156,24 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { RpcCallback<GetRSGroupInfoOfTableResponse> done) { GetRSGroupInfoOfTableResponse.Builder builder = GetRSGroupInfoOfTableResponse.newBuilder(); TableName tableName = ProtobufUtil.toTableName(request.getTableName()); - RSGroupAdminEndpoint.LOG.info( + LOG.info( master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, table=" + tableName); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preGetRSGroupInfoOfTable(tableName); } checkPermission("getRSGroupInfoOfTable"); - RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupInfoOfTable(tableName); - if (RSGroupInfo != null) { - builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(RSGroupInfo)); + Optional<RSGroupInfo> optGroup = + RSGroupUtil.getRSGroupInfo(master, groupAdminServer, tableName); + if (optGroup.isPresent()) { + builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(fillTables(optGroup.get()))); + } else { + if (master.getTableStateManager().isTablePresent(tableName)) { + RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP); + builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(fillTables(rsGroupInfo))); + } } + if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postGetRSGroupInfoOfTable(tableName); } @@ -165,8 +191,8 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { for (HBaseProtos.ServerName el : request.getServersList()) { hostPorts.add(Address.fromParts(el.getHostName(), el.getPort())); } - RSGroupAdminEndpoint.LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts + - " to rsgroup " + request.getTargetGroup()); + LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts + " to rsgroup " + + request.getTargetGroup()); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preMoveServers(hostPorts, request.getTargetGroup()); @@ -182,6 +208,27 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { done.run(builder.build()); } + private void moveTablesAndWait(Set<TableName> tables, String targetGroup) throws IOException { + List<Long> procIds = new ArrayList<Long>(); + for (TableName tableName : tables) { + TableDescriptor oldTd = master.getTableDescriptors().get(tableName); + if (oldTd == null) { + continue; + } + TableDescriptor newTd = + TableDescriptorBuilder.newBuilder(oldTd).setRegionServerGroup(targetGroup).build(); + procIds.add(master.modifyTable(tableName, newTd, HConstants.NO_NONCE, HConstants.NO_NONCE)); + } + for (long procId : procIds) { + Procedure<?> proc = master.getMasterProcedureExecutor().getProcedure(procId); + if (proc == null) { + continue; + } + ProcedureSyncWait.waitForProcedureToCompleteIOE(master.getMasterProcedureExecutor(), proc, + Long.MAX_VALUE); + } + } + @Override public void moveTables(RpcController controller, MoveTablesRequest request, RpcCallback<MoveTablesResponse> done) { @@ -190,14 +237,14 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { for (HBaseProtos.TableName tableName : request.getTableNameList()) { tables.add(ProtobufUtil.toTableName(tableName)); } - RSGroupAdminEndpoint.LOG.info(master.getClientIdAuditPrefix() + " move tables " + tables + - " to rsgroup " + request.getTargetGroup()); + LOG.info(master.getClientIdAuditPrefix() + " move tables " + tables + " to rsgroup " + + request.getTargetGroup()); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preMoveTables(tables, request.getTargetGroup()); } checkPermission("moveTables"); - groupAdminServer.moveTables(tables, request.getTargetGroup()); + moveTablesAndWait(tables, request.getTargetGroup()); if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postMoveTables(tables, request.getTargetGroup()); } @@ -211,8 +258,7 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { public void addRSGroup(RpcController controller, AddRSGroupRequest request, RpcCallback<AddRSGroupResponse> done) { AddRSGroupResponse.Builder builder = AddRSGroupResponse.newBuilder(); - RSGroupAdminEndpoint.LOG - .info(master.getClientIdAuditPrefix() + " add rsgroup " + request.getRSGroupName()); + LOG.info(master.getClientIdAuditPrefix() + " add rsgroup " + request.getRSGroupName()); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preAddRSGroup(request.getRSGroupName()); @@ -232,8 +278,7 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { public void removeRSGroup(RpcController controller, RemoveRSGroupRequest request, RpcCallback<RemoveRSGroupResponse> done) { RemoveRSGroupResponse.Builder builder = RemoveRSGroupResponse.newBuilder(); - RSGroupAdminEndpoint.LOG - .info(master.getClientIdAuditPrefix() + " remove rsgroup " + request.getRSGroupName()); + LOG.info(master.getClientIdAuditPrefix() + " remove rsgroup " + request.getRSGroupName()); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preRemoveRSGroup(request.getRSGroupName()); @@ -253,7 +298,7 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { public void balanceRSGroup(RpcController controller, BalanceRSGroupRequest request, RpcCallback<BalanceRSGroupResponse> done) { BalanceRSGroupResponse.Builder builder = BalanceRSGroupResponse.newBuilder(); - RSGroupAdminEndpoint.LOG.info( + LOG.info( master.getClientIdAuditPrefix() + " balance rsgroup, group=" + request.getRSGroupName()); try { if (master.getMasterCoprocessorHost() != null) { @@ -276,14 +321,28 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { public void listRSGroupInfos(RpcController controller, ListRSGroupInfosRequest request, RpcCallback<ListRSGroupInfosResponse> done) { ListRSGroupInfosResponse.Builder builder = ListRSGroupInfosResponse.newBuilder(); - RSGroupAdminEndpoint.LOG.info(master.getClientIdAuditPrefix() + " list rsgroup"); + LOG.info(master.getClientIdAuditPrefix() + " list rsgroup"); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preListRSGroups(); } checkPermission("listRSGroup"); - for (RSGroupInfo RSGroupInfo : groupAdminServer.listRSGroups()) { - builder.addRSGroupInfo(ProtobufUtil.toProtoGroupInfo(RSGroupInfo)); + List<RSGroupInfo> rsGroupInfos = groupAdminServer.listRSGroups().stream() + .map(RSGroupInfo::new).collect(Collectors.toList()); + Map<String, RSGroupInfo> name2Info = new HashMap<>(); + for (RSGroupInfo rsGroupInfo : rsGroupInfos) { + name2Info.put(rsGroupInfo.getName(), rsGroupInfo); + } + for (TableDescriptor td : master.getTableDescriptors().getAll().values()) { + String groupName = td.getRegionServerGroup().orElse(RSGroupInfo.DEFAULT_GROUP); + RSGroupInfo rsGroupInfo = name2Info.get(groupName); + if (rsGroupInfo != null) { + rsGroupInfo.addTable(td.getTableName()); + } + } + for (RSGroupInfo rsGroupInfo : rsGroupInfos) { + // TODO: this can be done at once outside this loop, do not need to scan all every time. + builder.addRSGroupInfo(ProtobufUtil.toProtoGroupInfo(rsGroupInfo)); } if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postListRSGroups(); @@ -300,8 +359,7 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { GetRSGroupInfoOfServerResponse.Builder builder = GetRSGroupInfoOfServerResponse.newBuilder(); Address hp = Address.fromParts(request.getServer().getHostName(), request.getServer().getPort()); - RSGroupAdminEndpoint.LOG - .info(master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, server=" + hp); + LOG.info(master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, server=" + hp); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preGetRSGroupInfoOfServer(hp); @@ -309,7 +367,7 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { checkPermission("getRSGroupInfoOfServer"); RSGroupInfo info = groupAdminServer.getRSGroupOfServer(hp); if (info != null) { - builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(info)); + builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(fillTables(info))); } if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postGetRSGroupInfoOfServer(hp); @@ -332,15 +390,16 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { for (HBaseProtos.TableName tableName : request.getTableNameList()) { tables.add(ProtobufUtil.toTableName(tableName)); } - RSGroupAdminEndpoint.LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts + - " and tables " + tables + " to rsgroup" + request.getTargetGroup()); + LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts + " and tables " + + tables + " to rsgroup" + request.getTargetGroup()); try { if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preMoveServersAndTables(hostPorts, tables, request.getTargetGroup()); } checkPermission("moveServersAndTables"); - groupAdminServer.moveServersAndTables(hostPorts, tables, request.getTargetGroup()); + groupAdminServer.moveServers(hostPorts, request.getTargetGroup()); + moveTablesAndWait(tables, request.getTargetGroup()); if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postMoveServersAndTables(hostPorts, tables, request.getTargetGroup()); @@ -359,7 +418,7 @@ class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { for (HBaseProtos.ServerName el : request.getServersList()) { servers.add(Address.fromParts(el.getHostName(), el.getPort())); } - RSGroupAdminEndpoint.LOG.info( + LOG.info( master.getClientIdAuditPrefix() + " remove decommissioned servers from rsgroup: " + servers); try { if (master.getMasterCoprocessorHost() != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java index be76805..0f943d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.HBaseIOException; @@ -111,13 +110,13 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { @Override public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<RegionInfo>> - clusterState) throws HBaseIOException { + clusterState) throws IOException { return balanceCluster(clusterState); } @Override public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState) - throws HBaseIOException { + throws IOException { if (!isOnline()) { throw new ConstraintException( RSGroupInfoManager.class.getSimpleName() + " is not online, unable to perform balance"); @@ -168,8 +167,8 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { } @Override - public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions, - List<ServerName> servers) throws HBaseIOException { + public Map<ServerName, List<RegionInfo>> roundRobinAssignment( + List<RegionInfo> regions, List<ServerName> servers) throws IOException { Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap(); ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create(); ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create(); @@ -198,12 +197,11 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { try { Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>(); ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create(); + RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); for (RegionInfo region : regions.keySet()) { - String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); - if (groupName == null) { - LOG.debug("Group not found for table " + region.getTable() + ", using default"); - groupName = RSGroupInfo.DEFAULT_GROUP; - } + String groupName = + RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable()) + .orElse(defaultInfo).getName(); groupToRegion.put(groupName, region); } for (String key : groupToRegion.keySet()) { @@ -234,7 +232,7 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { @Override public ServerName randomAssignment(RegionInfo region, - List<ServerName> servers) throws HBaseIOException { + List<ServerName> servers) throws IOException { ListMultimap<String,RegionInfo> regionMap = LinkedListMultimap.create(); ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create(); generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap); @@ -246,12 +244,11 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { ListMultimap<String, RegionInfo> regionMap, ListMultimap<String, ServerName> serverMap) throws HBaseIOException { try { + RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); for (RegionInfo region : regions) { - String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); - if (groupName == null) { - LOG.debug("Group not found for table " + region.getTable() + ", using default"); - groupName = RSGroupInfo.DEFAULT_GROUP; - } + String groupName = + RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable()) + .orElse(defaultInfo).getName(); regionMap.put(groupName, region); } for (String groupKey : regionMap.keySet()) { @@ -299,11 +296,11 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { } private Pair<Map<ServerName, List<RegionInfo>>, List<RegionPlan>> correctAssignments( - Map<ServerName, List<RegionInfo>> existingAssignments) throws HBaseIOException{ + Map<ServerName, List<RegionInfo>> existingAssignments) throws IOException { // To return Map<ServerName, List<RegionInfo>> correctAssignments = new TreeMap<>(); List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>(); - + RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP); for (Map.Entry<ServerName, List<RegionInfo>> assignments : existingAssignments.entrySet()){ ServerName currentHostServer = assignments.getKey(); correctAssignments.put(currentHostServer, new LinkedList<>()); @@ -311,15 +308,11 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { for (RegionInfo region : regions) { RSGroupInfo targetRSGInfo = null; try { - String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); - if (groupName == null) { - LOG.debug("Group not found for table " + region.getTable() + ", using default"); - groupName = RSGroupInfo.DEFAULT_GROUP; - } - targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName); + targetRSGInfo = + RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable()) + .orElse(defaultInfo); } catch (IOException exp) { - LOG.debug("RSGroup information null for region of table " + region.getTable(), - exp); + LOG.debug("RSGroup information null for region of table " + region.getTable(), exp); } if (targetRSGInfo == null || !targetRSGInfo.containsServer(currentHostServer.getAddress())) { // region is mis-placed @@ -336,7 +329,7 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { } @Override - public void initialize() throws HBaseIOException { + public void initialize() throws IOException { try { if (rsGroupInfoManager == null) { List<RSGroupAdminEndpoint> cps = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java index 70aeabf..28f7c1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.rsgroup; import java.io.IOException; import java.util.List; import java.util.Set; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.net.Address; import org.apache.yetus.audience.InterfaceAudience; @@ -64,18 +63,6 @@ public interface RSGroupInfoManager { RSGroupInfo getRSGroup(String groupName) throws IOException; /** - * Get the group membership of a table - */ - String getRSGroupOfTable(TableName tableName) throws IOException; - - /** - * Set the group membership of a set of tables - * @param tableNames set of tables to move - * @param groupName name of group of tables to move to - */ - void moveTables(Set<TableName> tableNames, String groupName) throws IOException; - - /** * List the existing {@code RSGroupInfo}s. */ List<RSGroupInfo> listRSGroups() throws IOException; @@ -92,16 +79,6 @@ public interface RSGroupInfoManager { boolean isOnline(); /** - * Move servers and tables to a new group. - * @param servers list of servers, must be part of the same group - * @param tables set of tables to move - * @param srcGroup groupName being moved from - * @param dstGroup groupName being moved to - */ - void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String srcGroup, - String dstGroup) throws IOException; - - /** * Remove decommissioned servers from rsgroup * @param servers set of servers to remove */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java index 8aa7520..37f3ce6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java @@ -23,10 +23,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.OptionalLong; import java.util.Set; import java.util.SortedSet; @@ -143,7 +141,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { // There two Maps are immutable and wholesale replaced on each modification // so are safe to access concurrently. See class comment. private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap(); - private volatile Map<TableName, String> tableMap = Collections.emptyMap(); private final MasterServices masterServices; private final AsyncClusterConnection conn; @@ -261,44 +258,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { } @Override - public String getRSGroupOfTable(TableName tableName) { - return tableMap.get(tableName); - } - - @Override - public synchronized void moveTables(Set<TableName> tableNames, String groupName) - throws IOException { - // Check if rsGroupMap contains the destination rsgroup - if (groupName != null && !rsGroupMap.containsKey(groupName)) { - throw new DoNotRetryIOException("Group " + groupName + " does not exist"); - } - - // Make a copy of rsGroupMap to update - Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); - - // Remove tables from their original rsgroups - // and update the copy of rsGroupMap - for (TableName tableName : tableNames) { - if (tableMap.containsKey(tableName)) { - RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName))); - src.removeTable(tableName); - newGroupMap.put(src.getName(), src); - } - } - - // Add tables to the destination rsgroup - // and update the copy of rsGroupMap - if (groupName != null) { - RSGroupInfo dstGroup = new RSGroupInfo(newGroupMap.get(groupName)); - dstGroup.addAllTables(tableNames); - newGroupMap.put(dstGroup.getName(), dstGroup); - } - - // Flush according to the updated copy of rsGroupMap - flushConfig(newGroupMap); - } - - @Override public synchronized void removeRSGroup(String groupName) throws IOException { if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { throw new DoNotRetryIOException( @@ -311,7 +270,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { @Override public List<RSGroupInfo> listRSGroups() { - return Lists.newLinkedList(rsGroupMap.values()); + return Lists.newArrayList(rsGroupMap.values()); } @Override @@ -320,31 +279,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { } @Override - public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String srcGroup, - String dstGroup) throws IOException { - // get server's group - RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup); - RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup); - - // move servers - for (Address el : servers) { - srcGroupInfo.removeServer(el); - dstGroupInfo.addServer(el); - } - // move tables - for (TableName tableName : tables) { - srcGroupInfo.removeTable(tableName); - dstGroupInfo.addTable(tableName); - } - - // flush changed groupinfo - Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); - newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo); - newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo); - flushConfig(newGroupMap); - } - - @Override public synchronized void removeServers(Set<Address> servers) throws IOException { Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>(); for (Address el : servers) { @@ -425,7 +359,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { * startup of the manager. */ private synchronized void refresh(boolean forceOnline) throws IOException { - List<RSGroupInfo> groupList = new LinkedList<>(); + List<RSGroupInfo> groupList = new ArrayList<>(); // Overwrite anything read from zk, group table is source of truth // if online read from GROUP table @@ -437,37 +371,20 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { groupList.addAll(retrieveGroupListFromZookeeper()); } - // refresh default group, prune - NavigableSet<TableName> orphanTables = new TreeSet<>(); - for (String entry : masterServices.getTableDescriptors().getAll().keySet()) { - orphanTables.add(TableName.valueOf(entry)); - } - for (RSGroupInfo group : groupList) { - if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { - orphanTables.removeAll(group.getTables()); - } - } - // This is added to the last of the list so it overwrites the 'default' rsgroup loaded // from region group table or zk - groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(), orphanTables)); + groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers())); // populate the data HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(); - HashMap<TableName, String> newTableMap = Maps.newHashMap(); for (RSGroupInfo group : groupList) { newGroupMap.put(group.getName(), group); - for (TableName table : group.getTables()) { - newTableMap.put(table, group.getName()); - } } - resetRSGroupAndTableMaps(newGroupMap, newTableMap); + resetRSGroupMap(newGroupMap); updateCacheOfRSGroups(rsGroupMap.keySet()); } - private synchronized Map<TableName, String> flushConfigTable(Map<String, RSGroupInfo> groupMap) - throws IOException { - Map<TableName, String> newTableMap = Maps.newHashMap(); + private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOException { List<Mutation> mutations = Lists.newArrayList(); // populate deletes @@ -484,15 +401,11 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { Put p = new Put(Bytes.toBytes(RSGroupInfo.getName())); p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray()); mutations.add(p); - for (TableName entry : RSGroupInfo.getTables()) { - newTableMap.put(entry, RSGroupInfo.getName()); - } } if (mutations.size() > 0) { multiMutate(mutations); } - return newTableMap; } private synchronized void flushConfig() throws IOException { @@ -500,8 +413,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { } private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException { - Map<TableName, String> newTableMap; - // For offline mode persistence is still unavailable // We're refreshing in-memory state but only for servers in default group if (!isOnline()) { @@ -516,7 +427,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); if (!oldGroupMap.equals(newGroupMap) /* compare both tables and servers in other groups */ || !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables()) - /* compare tables in default group */) { + /* compare tables in default group */) { throw new IOException("Only servers in default group can be updated during offline mode"); } @@ -533,11 +444,11 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { return; } - /* For online mode, persist to Zookeeper */ - newTableMap = flushConfigTable(newGroupMap); + /* For online mode, persist to hbase:rsgroup and Zookeeper */ + flushConfigTable(newGroupMap); // Make changes visible after having been persisted to the source of truth - resetRSGroupAndTableMaps(newGroupMap, newTableMap); + resetRSGroupMap(newGroupMap); try { String groupBasePath = @@ -575,11 +486,9 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { /** * Make changes visible. Caller must be synchronized on 'this'. */ - private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap, - Map<TableName, String> newTableMap) { + private void resetRSGroupMap(Map<String, RSGroupInfo> newRSGroupMap) { // Make maps Immutable. this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap); - this.tableMap = Collections.unmodifiableMap(newTableMap); } /** @@ -597,7 +506,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { return masterServices.getServerManager().getOnlineServersList(); } LOG.debug("Reading online RS from zookeeper"); - List<ServerName> servers = new LinkedList<>(); + List<ServerName> servers = new ArrayList<>(); try { for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) { servers.add(ServerName.parseServerName(el)); @@ -633,7 +542,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { // the rsGroupMap then writing it out. private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException { RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP); - RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables()); + RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers); HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); newGroupMap.put(newInfo.getName(), newInfo); flushConfig(newGroupMap); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupUtil.java new file mode 100644 index 0000000..a08d236 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupUtil.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.rsgroup; + +import java.io.IOException; +import java.util.Collection; +import java.util.Optional; +import java.util.function.Predicate; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.master.ClusterSchema; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class for RSGroup implementation + */ +@InterfaceAudience.Private +final class RSGroupUtil { + + private static final Logger LOG = LoggerFactory.getLogger(RSGroupUtil.class); + + private RSGroupUtil() { + } + + @FunctionalInterface + private interface GetRSGroup { + RSGroupInfo get(String groupName) throws IOException; + } + + private static Optional<RSGroupInfo> getRSGroupInfo(MasterServices master, GetRSGroup getter, + TableName tableName) throws IOException { + TableDescriptor td = master.getTableDescriptors().get(tableName); + if (td == null) { + return Optional.empty(); + } + Optional<String> optGroupNameOfTable = td.getRegionServerGroup(); + if (optGroupNameOfTable.isPresent()) { + RSGroupInfo group = getter.get(optGroupNameOfTable.get()); + if (group != null) { + return Optional.of(group); + } + } + ClusterSchema clusterSchema = master.getClusterSchema(); + if (clusterSchema == null) { + if (TableName.isMetaTableName(tableName)) { + LOG.info("Can not get the namespace rs group config for meta table, since the" + + " meta table is not online yet, will use default group to assign meta first"); + } else { + LOG.warn("ClusterSchema is null, can only use default rsgroup, should not happen?"); + } + return Optional.empty(); + } + NamespaceDescriptor nd = clusterSchema.getNamespace(tableName.getNamespaceAsString()); + String groupNameOfNs = nd.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); + if (groupNameOfNs == null) { + return Optional.empty(); + } + return Optional.ofNullable(getter.get(groupNameOfNs)); + } + + /** + * Will try to get the rsgroup from {@link TableDescriptor} first, and then try to get the rsgroup + * from the {@link NamespaceDescriptor}. If still not present, return empty. + */ + static Optional<RSGroupInfo> getRSGroupInfo(MasterServices master, RSGroupInfoManager manager, + TableName tableName) throws IOException { + return getRSGroupInfo(master, manager::getRSGroup, tableName); + } + + /** + * Will try to get the rsgroup from {@link TableDescriptor} first, and then try to get the rsgroup + * from the {@link NamespaceDescriptor}. If still not present, return empty. + */ + static Optional<RSGroupInfo> getRSGroupInfo(MasterServices master, RSGroupAdmin admin, + TableName tableName) throws IOException { + return getRSGroupInfo(master, admin::getRSGroupInfo, tableName); + } + + /** + * Fill the tables field for {@link RSGroupInfo}, for backward compatibility. + */ + @SuppressWarnings("deprecation") + static RSGroupInfo fillTables(RSGroupInfo rsGroupInfo, Collection<TableDescriptor> tds) { + RSGroupInfo newRsGroupInfo = new RSGroupInfo(rsGroupInfo); + Predicate<TableDescriptor> filter; + if (rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { + filter = td -> { + Optional<String> optGroupName = td.getRegionServerGroup(); + return !optGroupName.isPresent() || optGroupName.get().equals(RSGroupInfo.DEFAULT_GROUP); + }; + } else { + filter = td -> { + Optional<String> optGroupName = td.getRegionServerGroup(); + return optGroupName.isPresent() && optGroupName.get().equals(newRsGroupInfo.getName()); + }; + } + tds.stream().filter(filter).map(TableDescriptor::getTableName) + .forEach(newRsGroupInfo::addTable); + return newRsGroupInfo; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement2.java index 6dc3711..47337f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement2.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -26,7 +27,6 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; @@ -82,7 +82,7 @@ public class TestRegionPlacement2 { } @Test - public void testFavoredNodesPresentForRoundRobinAssignment() throws HBaseIOException { + public void testFavoredNodesPresentForRoundRobinAssignment() throws IOException { LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(TEST_UTIL.getConfiguration()); balancer.setMasterServices(TEST_UTIL.getMiniHBaseCluster().getMaster()); balancer.initialize(); @@ -143,7 +143,7 @@ public class TestRegionPlacement2 { } @Test - public void testFavoredNodesPresentForRandomAssignment() throws HBaseIOException { + public void testFavoredNodesPresentForRandomAssignment() throws IOException { LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(TEST_UTIL.getConfiguration()); balancer.setMasterServices(TEST_UTIL.getMiniHBaseCluster().getMaster()); balancer.initialize(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java index 570bb3a..4c00bcf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -60,17 +61,13 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; public class RSGroupableBalancerTestBase { static SecureRandom rand = new SecureRandom(); - static String[] groups = new String[] {RSGroupInfo.DEFAULT_GROUP, "dg2", "dg3", "dg4"}; + static String[] groups = new String[] { RSGroupInfo.DEFAULT_GROUP, "dg2", "dg3", "dg4" }; static TableName table0 = TableName.valueOf("dt0"); - static TableName[] tables = - new TableName[] { TableName.valueOf("dt1"), - TableName.valueOf("dt2"), - TableName.valueOf("dt3"), - TableName.valueOf("dt4")}; + static TableName[] tables = new TableName[] { TableName.valueOf("dt1"), TableName.valueOf("dt2"), + TableName.valueOf("dt3"), TableName.valueOf("dt4") }; static List<ServerName> servers; static Map<String, RSGroupInfo> groupMap; - static Map<TableName, String> tableMap = new HashMap<>(); - static List<TableDescriptor> tableDescs; + static Map<TableName, TableDescriptor> tableDescs; int[] regionAssignment = new int[] { 2, 5, 7, 10, 4, 3, 1 }; static int regionId = 0; @@ -113,20 +110,19 @@ public class RSGroupableBalancerTestBase { /** * All regions have an assignment. */ - protected void assertImmediateAssignment(List<RegionInfo> regions, - List<ServerName> servers, - Map<RegionInfo, ServerName> assignments) - throws IOException { + protected void assertImmediateAssignment(List<RegionInfo> regions, List<ServerName> servers, + Map<RegionInfo, ServerName> assignments) throws IOException { for (RegionInfo region : regions) { assertTrue(assignments.containsKey(region)); ServerName server = assignments.get(region); TableName tableName = region.getTable(); - String groupName = getMockedGroupInfoManager().getRSGroupOfTable(tableName); + String groupName = + tableDescs.get(tableName).getRegionServerGroup().orElse(RSGroupInfo.DEFAULT_GROUP); assertTrue(StringUtils.isNotEmpty(groupName)); RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(groupName); assertTrue("Region is not correctly assigned to group servers.", - gInfo.containsServer(server.getAddress())); + gInfo.containsServer(server.getAddress())); } } @@ -169,16 +165,13 @@ public class RSGroupableBalancerTestBase { ServerName oldAssignedServer = existing.get(r); TableName tableName = r.getTable(); String groupName = - getMockedGroupInfoManager().getRSGroupOfTable(tableName); + tableDescs.get(tableName).getRegionServerGroup().orElse(RSGroupInfo.DEFAULT_GROUP); assertTrue(StringUtils.isNotEmpty(groupName)); - RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup( - groupName); - assertTrue( - "Region is not correctly assigned to group servers.", - gInfo.containsServer(currentServer.getAddress())); - if (oldAssignedServer != null - && onlineHostNames.contains(oldAssignedServer - .getHostname())) { + RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(groupName); + assertTrue("Region is not correctly assigned to group servers.", + gInfo.containsServer(currentServer.getAddress())); + if (oldAssignedServer != null && + onlineHostNames.contains(oldAssignedServer.getHostname())) { // this region was previously assigned somewhere, and that // host is still around, then the host must have been is a // different group. @@ -358,13 +351,12 @@ public class RSGroupableBalancerTestBase { /** * Construct group info, with each group having at least one server. - * * @param servers the servers * @param groups the groups * @return the map */ - protected static Map<String, RSGroupInfo> constructGroupInfo( - List<ServerName> servers, String[] groups) { + protected static Map<String, RSGroupInfo> constructGroupInfo(List<ServerName> servers, + String[] groups) { assertTrue(servers != null); assertTrue(servers.size() >= groups.length); int index = 0; @@ -377,8 +369,7 @@ public class RSGroupableBalancerTestBase { } while (index < servers.size()) { int grpIndex = rand.nextInt(groups.length); - groupMap.get(groups[grpIndex]).addServer( - servers.get(index).getAddress()); + groupMap.get(groups[grpIndex]).addServer(servers.get(index).getAddress()); index++; } return groupMap; @@ -389,29 +380,28 @@ public class RSGroupableBalancerTestBase { * @param hasBogusTable there is a table that does not determine the group * @return the list of table descriptors */ - protected static List<TableDescriptor> constructTableDesc(boolean hasBogusTable) { - List<TableDescriptor> tds = Lists.newArrayList(); + protected static Map<TableName, TableDescriptor> constructTableDesc(boolean hasBogusTable) { + Map<TableName, TableDescriptor> tds = new HashMap<>(); int index = rand.nextInt(groups.length); for (int i = 0; i < tables.length; i++) { - TableDescriptor htd = TableDescriptorBuilder.newBuilder(tables[i]).build(); int grpIndex = (i + index) % groups.length; String groupName = groups[grpIndex]; - tableMap.put(tables[i], groupName); - tds.add(htd); + TableDescriptor htd = + TableDescriptorBuilder.newBuilder(tables[i]).setRegionServerGroup(groupName).build(); + tds.put(htd.getTableName(), htd); } if (hasBogusTable) { - tableMap.put(table0, ""); - tds.add(TableDescriptorBuilder.newBuilder(table0).build()); + tds.put(table0, TableDescriptorBuilder.newBuilder(table0).setRegionServerGroup("").build()); } return tds; } protected static MasterServices getMockedMaster() throws IOException { TableDescriptors tds = Mockito.mock(TableDescriptors.class); - Mockito.when(tds.get(tables[0])).thenReturn(tableDescs.get(0)); - Mockito.when(tds.get(tables[1])).thenReturn(tableDescs.get(1)); - Mockito.when(tds.get(tables[2])).thenReturn(tableDescs.get(2)); - Mockito.when(tds.get(tables[3])).thenReturn(tableDescs.get(3)); + Mockito.when(tds.get(tables[0])).thenReturn(tableDescs.get(tables[0])); + Mockito.when(tds.get(tables[1])).thenReturn(tableDescs.get(tables[1])); + Mockito.when(tds.get(tables[2])).thenReturn(tableDescs.get(tables[2])); + Mockito.when(tds.get(tables[3])).thenReturn(tableDescs.get(tables[3])); MasterServices services = Mockito.mock(HMaster.class); Mockito.when(services.getTableDescriptors()).thenReturn(tds); AssignmentManager am = Mockito.mock(AssignmentManager.class); @@ -430,13 +420,6 @@ public class RSGroupableBalancerTestBase { Mockito.when(gm.listRSGroups()).thenReturn( Lists.newLinkedList(groupMap.values())); Mockito.when(gm.isOnline()).thenReturn(true); - Mockito.when(gm.getRSGroupOfTable(Mockito.any())) - .thenAnswer(new Answer<String>() { - @Override - public String answer(InvocationOnMock invocation) throws Throwable { - return tableMap.get(invocation.getArgument(0)); - } - }); return gm; } @@ -444,15 +427,16 @@ public class RSGroupableBalancerTestBase { TableName tableName = null; RSGroupInfoManager gm = getMockedGroupInfoManager(); RSGroupInfo groupOfServer = null; - for(RSGroupInfo gInfo : gm.listRSGroups()){ - if(gInfo.containsServer(sn.getAddress())){ + for (RSGroupInfo gInfo : gm.listRSGroups()) { + if (gInfo.containsServer(sn.getAddress())) { groupOfServer = gInfo; break; } } - for(TableDescriptor desc : tableDescs){ - if(gm.getRSGroupOfTable(desc.getTableName()).endsWith(groupOfServer.getName())){ + for (TableDescriptor desc : tableDescs.values()) { + Optional<String> optGroupName = desc.getRegionServerGroup(); + if (optGroupName.isPresent() && optGroupName.get().endsWith(groupOfServer.getName())) { tableName = desc.getTableName(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java index 4b88201..2345a39 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java @@ -96,33 +96,30 @@ public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase { /** * Tests the bulk assignment used during cluster startup. - * - * Round-robin. Should yield a balanced cluster so same invariant as the - * load balancer holds, all servers holding either floor(avg) or - * ceiling(avg). + * <p/> + * Round-robin. Should yield a balanced cluster so same invariant as the load balancer holds, all + * servers holding either floor(avg) or ceiling(avg). */ @Test public void testBulkAssignment() throws Exception { List<RegionInfo> regions = randomRegions(25); - Map<ServerName, List<RegionInfo>> assignments = loadBalancer - .roundRobinAssignment(regions, servers); - //test empty region/servers scenario - //this should not throw an NPE + Map<ServerName, List<RegionInfo>> assignments = + loadBalancer.roundRobinAssignment(regions, servers); + // test empty region/servers scenario + // this should not throw an NPE loadBalancer.roundRobinAssignment(regions, Collections.emptyList()); - //test regular scenario + // test regular scenario assertTrue(assignments.keySet().size() == servers.size()); for (ServerName sn : assignments.keySet()) { List<RegionInfo> regionAssigned = assignments.get(sn); for (RegionInfo region : regionAssigned) { TableName tableName = region.getTable(); String groupName = - getMockedGroupInfoManager().getRSGroupOfTable(tableName); + tableDescs.get(tableName).getRegionServerGroup().orElse(RSGroupInfo.DEFAULT_GROUP); assertTrue(StringUtils.isNotEmpty(groupName)); - RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup( - groupName); - assertTrue( - "Region is not correctly assigned to group servers.", - gInfo.containsServer(sn.getAddress())); + RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(groupName); + assertTrue("Region is not correctly assigned to group servers.", + gInfo.containsServer(sn.getAddress())); } } ArrayListMultimap<String, ServerAndLoad> loadMap = convertToGroupBasedMap(assignments); @@ -158,24 +155,25 @@ public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase { onlineServers.addAll(servers); List<RegionInfo> regions = randomRegions(25); int bogusRegion = 0; - for(RegionInfo region : regions){ - String group = tableMap.get(region.getTable()); - if("dg3".equals(group) || "dg4".equals(group)){ + for (RegionInfo region : regions) { + String group = tableDescs.get(region.getTable()).getRegionServerGroup() + .orElse(RSGroupInfo.DEFAULT_GROUP); + if ("dg3".equals(group) || "dg4".equals(group)) { bogusRegion++; } } Set<Address> offlineServers = new HashSet<Address>(); offlineServers.addAll(groupMap.get("dg3").getServers()); offlineServers.addAll(groupMap.get("dg4").getServers()); - for(Iterator<ServerName> it = onlineServers.iterator(); it.hasNext();){ + for (Iterator<ServerName> it = onlineServers.iterator(); it.hasNext();) { ServerName server = it.next(); Address address = server.getAddress(); - if(offlineServers.contains(address)){ + if (offlineServers.contains(address)) { it.remove(); } } - Map<ServerName, List<RegionInfo>> assignments = loadBalancer - .roundRobinAssignment(regions, onlineServers); + Map<ServerName, List<RegionInfo>> assignments = + loadBalancer.roundRobinAssignment(regions, onlineServers); assertEquals(bogusRegion, assignments.get(LoadBalancer.BOGUS_SERVER_NAME).size()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java index e588a7e..a4ae636 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -32,7 +33,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.RegionMetrics; import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerName; @@ -98,7 +98,7 @@ public class TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal * Test HBASE-20791 */ @Test - public void testBalanceCluster() throws HBaseIOException { + public void testBalanceCluster() throws IOException { // mock cluster State Map<ServerName, List<RegionInfo>> clusterState = new HashMap<ServerName, List<RegionInfo>>(); ServerName serverA = servers.get(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin1.java index 27511e3..7471458 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin1.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin1.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; - import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.NamespaceDescriptor; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin2.java index 4b4097f..076362e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin2.java @@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.rsgroup.RSGroupAdminServer.DEFAULT_MAX_RET import static org.apache.hadoop.hbase.util.Threads.sleep; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -408,7 +407,9 @@ public class TestRSGroupsAdmin2 extends TestRSGroupsBase { assertTrue(newGroupTables.contains(tableName)); // verify that all region still assgin on targetServer - Assert.assertEquals(5, getTableServerRegionMap().get(tableName).get(targetServer).size()); + // TODO: uncomment after we reimplement moveServersAndTables, now the implementation is + // moveServers first and then moveTables, so the region will be moved to other region servers. + // Assert.assertEquals(5, getTableServerRegionMap().get(tableName).get(targetServer).size()); assertTrue(observer.preMoveServersAndTables); assertTrue(observer.postMoveServersAndTables); @@ -503,61 +504,6 @@ public class TestRSGroupsAdmin2 extends TestRSGroupsBase { }); } - @Test - public void testFailedMoveBeforeRetryExhaustedWhenMoveTable() throws Exception { - final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 1); - Pair<ServerName, RegionStateNode> gotPair = createTableWithRegionSplitting(newGroup, - 5); - - // move table to group - Thread t2 = new Thread(() -> { - LOG.info("thread2 start running, to move regions"); - try { - rsGroupAdmin.moveTables(Sets.newHashSet(tableName), newGroup.getName()); - } catch (IOException e) { - LOG.error("move server error", e); - } - }); - t2.start(); - - // start thread to recover region state - final ServerName ss = gotPair.getFirst(); - final RegionStateNode rsn = gotPair.getSecond(); - AtomicBoolean changed = new AtomicBoolean(false); - - Thread t1 = recoverRegionStateThread(ss, server -> { - List<RegionInfo> regions = master.getAssignmentManager().getRegionsOnServer(ss); - List<RegionInfo> tableRegions = new ArrayList<>(); - for (RegionInfo regionInfo : regions) { - if (regionInfo.getTable().equals(tableName)) { - tableRegions.add(regionInfo); - } - } - return tableRegions; - }, rsn, changed); - t1.start(); - - t1.join(); - t2.join(); - - TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() { - @Override - public boolean evaluate() { - if (changed.get()) { - boolean serverHasTableRegions = false; - for (RegionInfo regionInfo : master.getAssignmentManager().getRegionsOnServer(ss)) { - if (regionInfo.getTable().equals(tableName)) { - serverHasTableRegions = true; - break; - } - } - return !serverHasTableRegions && !rsn.getRegionLocation().equals(ss); - } - return false; - } - }); - } - private <T> Thread recoverRegionStateThread(T owner, Function<T, List<RegionInfo>> getRegions, RegionStateNode rsn, AtomicBoolean changed){ return new Thread(() -> { @@ -653,50 +599,6 @@ public class TestRSGroupsAdmin2 extends TestRSGroupsBase { } @Test - public void testFailedMoveTablesAndRepair() throws Exception{ - // This UT calls moveTables() twice to test the idempotency of it. - // The first time, movement fails because a region is made in SPLITTING state - // which will not be moved. - // The second time, the region state is OPEN and check if all - // regions on target group servers after the call. - final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 1); - Iterator iterator = newGroup.getServers().iterator(); - Address newGroupServer1 = (Address) iterator.next(); - - // create table - // randomly set a region state to SPLITTING to make move abort - Pair<ServerName, RegionStateNode> gotPair = createTableWithRegionSplitting(newGroup, - new Random().nextInt(8) + 4); - RegionStateNode rsn = gotPair.getSecond(); - - // move table to newGroup and check regions - try { - rsGroupAdmin.moveTables(Sets.newHashSet(tableName), newGroup.getName()); - fail("should get IOException when retry exhausted but there still exists failed moved " - + "regions"); - }catch (Exception e){ - assertTrue(e.getMessage().contains( - gotPair.getSecond().getRegionInfo().getRegionNameAsString())); - } - for(RegionInfo regionInfo : master.getAssignmentManager().getAssignedRegions()){ - if (regionInfo.getTable().equals(tableName) && regionInfo.equals(rsn.getRegionInfo())) { - assertNotEquals(master.getAssignmentManager().getRegionStates() - .getRegionServerOfRegion(regionInfo).getAddress(), newGroupServer1); - } - } - - // retry move table to newGroup and check if all regions are corrected - rsn.setState(RegionState.State.OPEN); - rsGroupAdmin.moveTables(Sets.newHashSet(tableName), newGroup.getName()); - for(RegionInfo regionInfo : master.getAssignmentManager().getAssignedRegions()){ - if (regionInfo.getTable().equals(tableName)) { - assertEquals(master.getAssignmentManager().getRegionStates() - .getRegionServerOfRegion(regionInfo).getAddress(), newGroupServer1); - } - } - } - - @Test public void testFailedMoveServersAndRepair() throws Exception{ // This UT calls moveServers() twice to test the idempotency of it. // The first time, movement fails because a region is made in SPLITTING state diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBalance.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBalance.java index 67f5c7e..8d10850 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBalance.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBalance.java @@ -45,8 +45,6 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.collect.Sets; - @Category({ MediumTests.class }) public class TestRSGroupsBalance extends TestRSGroupsBase { @@ -153,19 +151,21 @@ public class TestRSGroupsBalance extends TestRSGroupsBase { @Test public void testMisplacedRegions() throws Exception { - final TableName tableName = TableName.valueOf(tablePrefix + "_testMisplacedRegions"); - LOG.info("testMisplacedRegions"); + String namespace = tablePrefix + "_" + name.getMethodName(); + TEST_UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build()); + final TableName tableName = + TableName.valueOf(namespace, tablePrefix + "_" + name.getMethodName()); + LOG.info(name.getMethodName()); - final RSGroupInfo RSGroupInfo = addGroup("testMisplacedRegions", 1); + final RSGroupInfo rsGroupInfo = addGroup(name.getMethodName(), 1); TEST_UTIL.createMultiRegionTable(tableName, new byte[] { 'f' }, 15); TEST_UTIL.waitUntilAllRegionsAssigned(tableName); - - rsGroupAdminEndpoint.getGroupInfoManager().moveTables(Sets.newHashSet(tableName), - RSGroupInfo.getName()); + TEST_UTIL.getAdmin().modifyNamespace(NamespaceDescriptor.create(namespace) + .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, rsGroupInfo.getName()).build()); admin.balancerSwitch(true, true); - assertTrue(rsGroupAdmin.balanceRSGroup(RSGroupInfo.getName())); + assertTrue(rsGroupAdmin.balanceRSGroup(rsGroupInfo.getName())); admin.balancerSwitch(false, true); assertTrue(observer.preBalanceRSGroupCalled); assertTrue(observer.postBalanceRSGroupCalled); @@ -174,7 +174,7 @@ public class TestRSGroupsBalance extends TestRSGroupsBase { @Override public boolean evaluate() throws Exception { ServerName serverName = - ServerName.valueOf(RSGroupInfo.getServers().iterator().next().toString(), 1); + ServerName.valueOf(rsGroupInfo.getServers().iterator().next().toString(), 1); return admin.getConnection().getAdmin().getRegions(serverName).size() == 15; } }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java index 0e4fb34..b91cd5e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java @@ -74,7 +74,7 @@ public abstract class TestRSGroupsBase { protected static HBaseTestingUtility TEST_UTIL; protected static Admin admin; protected static HBaseCluster cluster; - protected static RSGroupAdmin rsGroupAdmin; + protected static RSGroupAdminClient rsGroupAdmin; protected static HMaster master; protected boolean INIT = false; protected static RSGroupAdminEndpoint rsGroupAdminEndpoint; @@ -188,8 +188,8 @@ public abstract class TestRSGroupsBase { RSGroupInfo defaultInfo = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP); rsGroupAdmin.addRSGroup(groupName); Set<Address> set = new HashSet<>(); - for(Address server: defaultInfo.getServers()) { - if(set.size() == serverCount) { + for (Address server : defaultInfo.getServers()) { + if (set.size() == serverCount) { break; } set.add(server); @@ -222,7 +222,7 @@ public abstract class TestRSGroupsBase { } protected void deleteGroups() throws IOException { - RSGroupAdmin groupAdmin = new RSGroupAdminClient(TEST_UTIL.getConnection()); + RSGroupAdminClient groupAdmin = new RSGroupAdminClient(TEST_UTIL.getConnection()); for(RSGroupInfo group: groupAdmin.listRSGroups()) { if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { groupAdmin.moveTables(group.getTables(), RSGroupInfo.DEFAULT_GROUP); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsOfflineMode.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsOfflineMode.java index 60887e4..d3577f2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsOfflineMode.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsOfflineMode.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.rsgroup; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -113,7 +112,7 @@ public class TestRSGroupsOfflineMode { final HRegionServer groupRS = ((MiniHBaseCluster) cluster).getRegionServer(1); final HRegionServer failoverRS = ((MiniHBaseCluster) cluster).getRegionServer(2); String newGroup = "my_group"; - RSGroupAdmin groupAdmin = new RSGroupAdminClient(TEST_UTIL.getConnection()); + RSGroupAdminClient groupAdmin = new RSGroupAdminClient(TEST_UTIL.getConnection()); groupAdmin.addRSGroup(newGroup); if (master.getAssignmentManager().getRegionStates().getRegionAssignments() .containsValue(failoverRS.getServerName())) { @@ -168,9 +167,6 @@ public class TestRSGroupsOfflineMode { .getMasterCoprocessorHost().findCoprocessor(RSGroupAdminEndpoint.class).getGroupInfoManager(); // Make sure balancer is in offline mode, since this is what we're testing. assertFalse(groupMgr.isOnline()); - // Verify the group affiliation that's loaded from ZK instead of tables. - assertEquals(newGroup, groupMgr.getRSGroupOfTable(RSGroupInfoManagerImpl.RSGROUP_TABLE_NAME)); - assertEquals(RSGroupInfo.DEFAULT_GROUP, groupMgr.getRSGroupOfTable(failoverTable)); // Kill final regionserver to see the failover happens for all tables except GROUP table since // it's group does not have any online RS. killRS.stop("die"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdminClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdminClient.java index fcaf1a7..a8cd277 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdminClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdminClient.java @@ -17,17 +17,26 @@ */ package org.apache.hadoop.hbase.rsgroup; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -37,22 +46,20 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; -import org.junit.Assert; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Sets; @InterfaceAudience.Private -public class VerifyingRSGroupAdminClient implements RSGroupAdmin { - private Table table; +public class VerifyingRSGroupAdminClient extends RSGroupAdminClient { + private Connection conn; private ZKWatcher zkw; - private RSGroupAdmin wrapped; + private RSGroupAdminClient wrapped; - public VerifyingRSGroupAdminClient(RSGroupAdmin RSGroupAdmin, Configuration conf) + public VerifyingRSGroupAdminClient(RSGroupAdminClient RSGroupAdmin, Configuration conf) throws IOException { wrapped = RSGroupAdmin; - table = ConnectionFactory.createConnection(conf) - .getTable(RSGroupInfoManagerImpl.RSGROUP_TABLE_NAME); + conn = ConnectionFactory.createConnection(conf); zkw = new ZKWatcher(conf, this.getClass().getSimpleName(), null); } @@ -121,31 +128,41 @@ public class VerifyingRSGroupAdminClient implements RSGroupAdmin { public void verify() throws IOException { Map<String, RSGroupInfo> groupMap = Maps.newHashMap(); Set<RSGroupInfo> zList = Sets.newHashSet(); - - for (Result result : table.getScanner(new Scan())) { - RSGroupProtos.RSGroupInfo proto = - RSGroupProtos.RSGroupInfo.parseFrom( - result.getValue( - RSGroupInfoManagerImpl.META_FAMILY_BYTES, - RSGroupInfoManagerImpl.META_QUALIFIER_BYTES)); - groupMap.put(proto.getName(), ProtobufUtil.toGroupInfo(proto)); + List<TableDescriptor> tds = new ArrayList<>(); + try (Admin admin = conn.getAdmin()) { + tds.addAll(admin.listTableDescriptors()); + tds.addAll(admin.listTableDescriptorsByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME)); + } + try (Table table = conn.getTable(RSGroupInfoManagerImpl.RSGROUP_TABLE_NAME); + ResultScanner scanner = table.getScanner(new Scan())) { + for (;;) { + Result result = scanner.next(); + if (result == null) { + break; + } + RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom(result.getValue( + RSGroupInfoManagerImpl.META_FAMILY_BYTES, RSGroupInfoManagerImpl.META_QUALIFIER_BYTES)); + RSGroupInfo rsGroupInfo = ProtobufUtil.toGroupInfo(proto); + groupMap.put(proto.getName(), RSGroupUtil.fillTables(rsGroupInfo, tds)); + } } - Assert.assertEquals(Sets.newHashSet(groupMap.values()), - Sets.newHashSet(wrapped.listRSGroups())); + assertEquals(Sets.newHashSet(groupMap.values()), Sets.newHashSet(wrapped.listRSGroups())); try { String groupBasePath = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, "rsgroup"); - for(String znode: ZKUtil.listChildrenNoWatch(zkw, groupBasePath)) { + for (String znode : ZKUtil.listChildrenNoWatch(zkw, groupBasePath)) { byte[] data = ZKUtil.getData(zkw, ZNodePaths.joinZNode(groupBasePath, znode)); - if(data.length > 0) { + if (data.length > 0) { ProtobufUtil.expectPBMagicPrefix(data); - ByteArrayInputStream bis = new ByteArrayInputStream( - data, ProtobufUtil.lengthOfPBMagic(), data.length); - zList.add(ProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis))); + ByteArrayInputStream bis = + new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length); + RSGroupInfo rsGroupInfo = + ProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)); + zList.add(RSGroupUtil.fillTables(rsGroupInfo, tds)); } } - Assert.assertEquals(zList.size(), groupMap.size()); - for(RSGroupInfo RSGroupInfo : zList) { - Assert.assertTrue(groupMap.get(RSGroupInfo.getName()).equals(RSGroupInfo)); + assertEquals(zList.size(), groupMap.size()); + for (RSGroupInfo rsGroupInfo : zList) { + assertTrue(groupMap.get(rsGroupInfo.getName()).equals(rsGroupInfo)); } } catch (KeeperException e) { throw new IOException("ZK verification failed", e);