http://git-wip-us.apache.org/repos/asf/hbase/blob/52f05079/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java new file mode 100644 index 0000000..00cd6b0 --- /dev/null +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java @@ -0,0 +1,955 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rsgroup; + +import com.google.common.collect.Sets; +import com.google.common.net.HostAndPort; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.ProcedureInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin.MasterSwitchType; +import org.apache.hadoop.hbase.constraint.ConstraintException; +import org.apache.hadoop.hbase.coprocessor.CoprocessorService; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.MasterObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersRequest; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersResponse; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesRequest; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesResponse; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RSGroupAdminService; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse; +import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotDescription; + + +public class RSGroupAdminEndpoint extends RSGroupAdminService + implements CoprocessorService, Coprocessor, MasterObserver { + + private MasterServices master = null; + + private static RSGroupInfoManagerImpl groupInfoManager; + private RSGroupAdminServer groupAdminServer; + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + MasterCoprocessorEnvironment menv = (MasterCoprocessorEnvironment)env; + master = menv.getMasterServices(); + groupInfoManager = new RSGroupInfoManagerImpl(master); + groupAdminServer = new RSGroupAdminServer(master, groupInfoManager); + Class<?> clazz = + master.getConfiguration().getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, null); + if (!RSGroupableBalancer.class.isAssignableFrom(clazz)) { + throw new IOException("Configured balancer is not a GroupableBalancer"); + } + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + } + + @Override + public Service getService() { + return this; + } + + public RSGroupInfoManager getGroupInfoManager() { + return groupInfoManager; + } + + @Override + public void getRSGroupInfo(RpcController controller, + GetRSGroupInfoRequest request, + RpcCallback<GetRSGroupInfoResponse> done) { + GetRSGroupInfoResponse response = null; + try { + GetRSGroupInfoResponse.Builder builder = + GetRSGroupInfoResponse.newBuilder(); + RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupInfo(request.getRSGroupName()); + if(RSGroupInfo != null) { + builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(RSGroupInfo)); + } + response = builder.build(); + } catch (IOException e) { + ResponseConverter.setControllerException(controller, e); + } + done.run(response); + } + + @Override + public void getRSGroupInfoOfTable(RpcController controller, + GetRSGroupInfoOfTableRequest request, + RpcCallback<GetRSGroupInfoOfTableResponse> done) { + GetRSGroupInfoOfTableResponse response = null; + try { + GetRSGroupInfoOfTableResponse.Builder builder = + GetRSGroupInfoOfTableResponse.newBuilder(); + TableName tableName = ProtobufUtil.toTableName(request.getTableName()); + RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupInfoOfTable(tableName); + if (RSGroupInfo == null) { + response = builder.build(); + } else { + response = builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(RSGroupInfo)).build(); + } + } catch (IOException e) { + ResponseConverter.setControllerException(controller, e); + } + done.run(response); + } + + @Override + public void moveServers(RpcController controller, + MoveServersRequest request, + RpcCallback<MoveServersResponse> done) { + RSGroupAdminProtos.MoveServersResponse response = null; + try { + RSGroupAdminProtos.MoveServersResponse.Builder builder = + RSGroupAdminProtos.MoveServersResponse.newBuilder(); + Set<HostAndPort> hostPorts = Sets.newHashSet(); + for(HBaseProtos.ServerName el: request.getServersList()) { + hostPorts.add(HostAndPort.fromParts(el.getHostName(), el.getPort())); + } + groupAdminServer.moveServers(hostPorts, request.getTargetGroup()); + response = builder.build(); + } catch (IOException e) { + ResponseConverter.setControllerException(controller, e); + } + done.run(response); + } + + @Override + public void moveTables(RpcController controller, + MoveTablesRequest request, + RpcCallback<MoveTablesResponse> done) { + MoveTablesResponse response = null; + try { + MoveTablesResponse.Builder builder = + MoveTablesResponse.newBuilder(); + Set<TableName> tables = new HashSet<TableName>(request.getTableNameList().size()); + for(HBaseProtos.TableName tableName: request.getTableNameList()) { + tables.add(ProtobufUtil.toTableName(tableName)); + } + groupAdminServer.moveTables(tables, request.getTargetGroup()); + response = builder.build(); + } catch (IOException e) { + ResponseConverter.setControllerException(controller, e); + } + done.run(response); + } + + @Override + public void addRSGroup(RpcController controller, + AddRSGroupRequest request, + RpcCallback<AddRSGroupResponse> done) { + AddRSGroupResponse response = null; + try { + AddRSGroupResponse.Builder builder = + AddRSGroupResponse.newBuilder(); + groupAdminServer.addRSGroup(request.getRSGroupName()); + response = builder.build(); + } catch (IOException e) { + ResponseConverter.setControllerException(controller, e); + } + done.run(response); + } + + @Override + public void removeRSGroup(RpcController controller, + RemoveRSGroupRequest request, + RpcCallback<RemoveRSGroupResponse> done) { + RemoveRSGroupResponse response = null; + try { + RemoveRSGroupResponse.Builder builder = + RemoveRSGroupResponse.newBuilder(); + groupAdminServer.removeRSGroup(request.getRSGroupName()); + response = builder.build(); + } catch (IOException e) { + ResponseConverter.setControllerException(controller, e); + } + done.run(response); + } + + @Override + public void balanceRSGroup(RpcController controller, + BalanceRSGroupRequest request, + RpcCallback<BalanceRSGroupResponse> done) { + BalanceRSGroupResponse.Builder builder = BalanceRSGroupResponse.newBuilder(); + try { + builder.setBalanceRan(groupAdminServer.balanceRSGroup(request.getRSGroupName())); + } catch (IOException e) { + ResponseConverter.setControllerException(controller, e); + builder.setBalanceRan(false); + } + done.run(builder.build()); + } + + @Override + public void listRSGroupInfos(RpcController controller, + ListRSGroupInfosRequest request, + RpcCallback<ListRSGroupInfosResponse> done) { + ListRSGroupInfosResponse response = null; + try { + ListRSGroupInfosResponse.Builder builder = + ListRSGroupInfosResponse.newBuilder(); + for(RSGroupInfo RSGroupInfo : groupAdminServer.listRSGroups()) { + builder.addRSGroupInfo(ProtobufUtil.toProtoGroupInfo(RSGroupInfo)); + } + response = builder.build(); + } catch (IOException e) { + ResponseConverter.setControllerException(controller, e); + } + done.run(response); + } + + @Override + public void getRSGroupInfoOfServer(RpcController controller, + GetRSGroupInfoOfServerRequest request, + RpcCallback<GetRSGroupInfoOfServerResponse> done) { + GetRSGroupInfoOfServerResponse.Builder builder = GetRSGroupInfoOfServerResponse.newBuilder(); + try { + HostAndPort hp = + HostAndPort.fromParts(request.getServer().getHostName(), request.getServer().getPort()); + RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupOfServer(hp); + if (RSGroupInfo != null) { + builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(RSGroupInfo)); + } + } catch (IOException e) { + ResponseConverter.setControllerException(controller, e); + } + done.run(builder.build()); + } + + @Override + public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx, + HTableDescriptor desc, HRegionInfo[] regions) throws IOException { + groupAdminServer.prepareRSGroupForTable(desc); + } + + @Override + public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + groupAdminServer.cleanupRSGroupForTable(tableName); + } + + //unused cp hooks + + @Override + public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx, + HTableDescriptor desc, + HRegionInfo[] regions) throws IOException { + + } + + @Override + public void preCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + HTableDescriptor desc, + HRegionInfo[] regions) throws IOException { + + } + + @Override + public void postCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + HTableDescriptor desc, + HRegionInfo[] regions) throws IOException { + + } + + @Override + public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void preDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void postDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void preTruncateTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void postTruncateTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName, + HTableDescriptor htd) throws IOException { + + } + + @Override + public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName, HTableDescriptor htd) throws IOException { + + } + + @Override + public void preModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName, HTableDescriptor htd) throws IOException { + + } + + @Override + public void postModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName, HTableDescriptor htd) throws IOException { + + } + + @Override + public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName, HColumnDescriptor column) throws IOException { + + } + + @Override + public void postAddColumn(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName, + HColumnDescriptor column) throws IOException { + + } + + @Override + public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName, HColumnDescriptor column) + throws IOException { + + } + + @Override + public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName, HColumnDescriptor column) + throws IOException { + + } + + @Override + public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName, HColumnDescriptor descriptor) + throws IOException { + + } + + @Override + public void postModifyColumn(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName, HColumnDescriptor descriptor) + throws IOException { + + } + + @Override + public void preModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName, HColumnDescriptor descriptor) + throws IOException { + + } + + @Override + public void postModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName, HColumnDescriptor descriptor) + throws IOException { + + } + + @Override + public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName + tableName, byte[] c) throws IOException { + + } + + @Override + public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName + tableName, byte[] c) throws IOException { + + } + + @Override + public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName + tableName, byte[] c) throws IOException { + + } + + @Override + public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName, byte[] c) throws IOException { + + } + + @Override + public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void postEnableTable(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void preEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void postEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void postDisableTable(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void preDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void preMove(ObserverContext<MasterCoprocessorEnvironment> ctx, HRegionInfo region, + ServerName srcServer, ServerName destServer) throws IOException { + + } + + @Override + public void postMove(ObserverContext<MasterCoprocessorEnvironment> ctx, HRegionInfo region, + ServerName srcServer, ServerName destServer) throws IOException { + + } + + @Override + public void preAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx, + ProcedureExecutor<MasterProcedureEnv> procEnv, + long procId) throws IOException { + + } + + @Override + public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + + } + + @Override + public void preListProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx) throws + IOException { + + } + + @Override + public void postListProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx, + List<ProcedureInfo> procInfoList) throws IOException { + + } + + @Override + public void preAssign(ObserverContext<MasterCoprocessorEnvironment> ctx, + HRegionInfo regionInfo) throws IOException { + + } + + @Override + public void postAssign(ObserverContext<MasterCoprocessorEnvironment> ctx, HRegionInfo + regionInfo) throws IOException { + + } + + @Override + public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> ctx, HRegionInfo + regionInfo, boolean force) throws IOException { + + } + + @Override + public void postUnassign(ObserverContext<MasterCoprocessorEnvironment> ctx, + HRegionInfo regionInfo, boolean force) throws IOException { + + } + + @Override + public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> ctx, + HRegionInfo regionInfo) throws IOException { + + } + + @Override + public void postRegionOffline(ObserverContext<MasterCoprocessorEnvironment> ctx, + HRegionInfo regionInfo) throws IOException { + + } + + @Override + public void preBalance(ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException { + + } + + @Override + public void postBalance(ObserverContext<MasterCoprocessorEnvironment> ctx, + List<RegionPlan> plans) throws IOException { + + } + + @Override + public boolean preSetSplitOrMergeEnabled(ObserverContext<MasterCoprocessorEnvironment> ctx, + boolean newValue, MasterSwitchType switchType) throws + IOException { + return false; + } + + @Override + public void postSetSplitOrMergeEnabled(ObserverContext<MasterCoprocessorEnvironment> ctx, + boolean newValue, MasterSwitchType switchType) throws + IOException { + + } + + @Override + public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> ctx, + boolean newValue) throws IOException { + return false; + } + + @Override + public void postBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> ctx, boolean + oldValue, boolean newValue) throws IOException { + + } + + @Override + public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException { + + } + + @Override + public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException { + + } + + @Override + public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + + } + + @Override + public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + + } + + @Override + public void preSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx, + SnapshotDescription snapshot, + HTableDescriptor hTableDescriptor) throws IOException { + + + } + + @Override + public void postSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx, SnapshotDescription + snapshot, HTableDescriptor hTableDescriptor) throws IOException { + + } + + @Override + public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx, + SnapshotDescription snapshot) throws IOException { + + } + + @Override + public void postListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx, + SnapshotDescription snapshot) throws IOException { + + } + + @Override + public void preCloneSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx, + SnapshotDescription snapshot, + HTableDescriptor hTableDescriptor) throws IOException { + + } + + @Override + public void postCloneSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx, + SnapshotDescription snapshot, + HTableDescriptor hTableDescriptor) throws IOException { + + } + + @Override + public void preRestoreSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx, + SnapshotDescription snapshot, + HTableDescriptor hTableDescriptor) throws IOException { + + } + + @Override + public void postRestoreSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx, + SnapshotDescription snapshot, + HTableDescriptor hTableDescriptor) throws IOException { + + } + + @Override + public void preDeleteSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx, + SnapshotDescription snapshot) throws IOException { + + } + + @Override + public void postDeleteSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx, + SnapshotDescription snapshot) throws IOException { + + } + + @Override + public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx, + List<TableName> tableNamesList, + List<HTableDescriptor> descriptors) throws IOException { + + } + + @Override + public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx, + List<HTableDescriptor> descriptors) throws IOException { + + } + + @Override + public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx, + List<TableName> tableNamesList, + List<HTableDescriptor> descriptors, + String regex) throws IOException { + + } + + @Override + public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx, + List<TableName> tableNamesList, + List<HTableDescriptor> descriptors, + String regex) throws IOException { + + } + + @Override + public void preGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx, + List<HTableDescriptor> descriptors, + String regex) throws IOException { + + } + + @Override + public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx, + List<HTableDescriptor> descriptors, + String regex) throws IOException { + + } + + @Override + public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, + NamespaceDescriptor ns) throws IOException { + String group = ns.getConfigurationValue(RSGroupInfo.NAMESPACEDESC_PROP_GROUP); + if(group != null && groupAdminServer.getRSGroupInfo(group) == null) { + throw new ConstraintException("Region server group "+group+" does not exit"); + } + } + + @Override + public void postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, + NamespaceDescriptor ns) throws IOException { + + } + + @Override + public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, + String namespace) throws IOException { + + } + + @Override + public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, + String namespace) throws IOException { + + } + + @Override + public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, + NamespaceDescriptor ns) throws IOException { + preCreateNamespace(ctx, ns); + } + + @Override + public void postModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, + NamespaceDescriptor ns) throws IOException { + + } + + @Override + public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, + String namespace) throws IOException { + + } + + @Override + public void postGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, + NamespaceDescriptor ns) throws IOException { + + } + + @Override + public void preListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx, + List<NamespaceDescriptor> descriptors) + throws IOException { + + } + + @Override + public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx, + List<NamespaceDescriptor> descriptors) + throws IOException { + + } + + @Override + public void preTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName + tableName) throws IOException { + + } + + @Override + public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName) throws IOException { + + } + + @Override + public void preSetUserQuota(ObserverContext<MasterCoprocessorEnvironment> ctx, String userName, + Quotas quotas) throws IOException { + + } + + @Override + public void postSetUserQuota(ObserverContext<MasterCoprocessorEnvironment> ctx, + String userName, Quotas quotas) throws IOException { + + } + + @Override + public void preSetUserQuota(ObserverContext<MasterCoprocessorEnvironment> ctx, String userName, + TableName tableName, Quotas quotas) throws IOException { + + } + + @Override + public void postSetUserQuota(ObserverContext<MasterCoprocessorEnvironment> ctx, String + userName, TableName tableName, Quotas quotas) throws IOException { + + } + + @Override + public void preSetUserQuota(ObserverContext<MasterCoprocessorEnvironment> ctx, String userName, + String namespace, Quotas quotas) throws IOException { + + } + + @Override + public void postSetUserQuota(ObserverContext<MasterCoprocessorEnvironment> ctx, String userName, + String namespace, Quotas quotas) throws IOException { + + } + + @Override + public void preSetTableQuota(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName, Quotas quotas) throws IOException { + + } + + @Override + public void postSetTableQuota(ObserverContext<MasterCoprocessorEnvironment> ctx, + TableName tableName, Quotas quotas) throws IOException { + + } + + @Override + public void preSetNamespaceQuota(ObserverContext<MasterCoprocessorEnvironment> ctx, + String namespace, Quotas quotas) throws IOException { + + } + + @Override + public void postSetNamespaceQuota(ObserverContext<MasterCoprocessorEnvironment> ctx, + String namespace, Quotas quotas) throws IOException { + + } + + @Override + public void preDispatchMerge(ObserverContext<MasterCoprocessorEnvironment> ctx, + HRegionInfo regionA, HRegionInfo regionB) throws IOException { + + } + + @Override + public void postDispatchMerge(ObserverContext<MasterCoprocessorEnvironment> c, + HRegionInfo regionA, HRegionInfo regionB) throws IOException { + + } + + @Override + public void preMoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx, + Set<HostAndPort> servers, String targetGroup) throws IOException { + + } + + @Override + public void postMoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx, + Set<HostAndPort> servers, String targetGroup) throws IOException { + + } + + @Override + public void preMoveTables(ObserverContext<MasterCoprocessorEnvironment> ctx, Set<TableName> + tables, String targetGroup) throws IOException { + + } + + @Override + public void postMoveTables(ObserverContext<MasterCoprocessorEnvironment> ctx, + Set<TableName> tables, String targetGroup) throws IOException { + + } + + @Override + public void preAddRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, + String name) throws IOException { + + } + + @Override + public void postAddRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, + String name) throws IOException { + + } + + @Override + public void preRemoveRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, + String name) throws IOException { + + } + + @Override + public void postRemoveRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, + String name) throws IOException { + + } + + @Override + public void preBalanceRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, + String groupName) throws IOException { + + } + + @Override + public void postBalanceRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, + String groupName, boolean balancerRan) throws IOException { + + } + + +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/52f05079/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java new file mode 100644 index 0000000..43ac3ad --- /dev/null +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java @@ -0,0 +1,503 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.rsgroup; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.net.HostAndPort; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.constraint.ConstraintException; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; + +/** + * Service to support Region Server Grouping (HBase-6721) + */ +@InterfaceAudience.Private +public class RSGroupAdminServer extends RSGroupAdmin { + private static final Log LOG = LogFactory.getLog(RSGroupAdminServer.class); + + private MasterServices master; + //List of servers that are being moved from one group to another + //Key=host:port,Value=targetGroup + private ConcurrentMap<HostAndPort,String> serversInTransition = + new ConcurrentHashMap<HostAndPort, String>(); + private RSGroupInfoManager RSGroupInfoManager; + + public RSGroupAdminServer(MasterServices master, + RSGroupInfoManager RSGroupInfoManager) throws IOException { + this.master = master; + this.RSGroupInfoManager = RSGroupInfoManager; + } + + @Override + public RSGroupInfo getRSGroupInfo(String groupName) throws IOException { + return getRSGroupInfoManager().getRSGroup(groupName); + } + + + @Override + public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException { + String groupName = getRSGroupInfoManager().getRSGroupOfTable(tableName); + if (groupName == null) { + return null; + } + return getRSGroupInfoManager().getRSGroup(groupName); + } + + @Override + public void moveServers(Set<HostAndPort> servers, String targetGroupName) + throws IOException { + if (servers == null) { + throw new ConstraintException( + "The list of servers cannot be null."); + } + if (StringUtils.isEmpty(targetGroupName)) { + throw new ConstraintException("The target group cannot be null."); + } + if (servers.size() < 1) { + return; + } + + RSGroupInfo targetGrp = getRSGroupInfo(targetGroupName); + if (targetGrp == null) { + throw new ConstraintException("Group does not exist: "+targetGroupName); + } + + RSGroupInfoManager manager = getRSGroupInfoManager(); + synchronized (manager) { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preMoveServers(servers, targetGroupName); + } + HostAndPort firstServer = servers.iterator().next(); + //we only allow a move from a single source group + //so this should be ok + RSGroupInfo srcGrp = manager.getRSGroupOfServer(firstServer); + //only move online servers (from default) + //or servers from other groups + //this prevents bogus servers from entering groups + if (srcGrp == null) { + throw new ConstraintException( + "Server "+firstServer+" does not have a group."); + } + if (RSGroupInfo.DEFAULT_GROUP.equals(srcGrp.getName())) { + Set<HostAndPort> onlineServers = new HashSet<HostAndPort>(); + for(ServerName server: master.getServerManager().getOnlineServers().keySet()) { + onlineServers.add(server.getHostPort()); + } + for(HostAndPort el: servers) { + if(!onlineServers.contains(el)) { + throw new ConstraintException( + "Server "+el+" is not an online server in default group."); + } + } + } + + if(srcGrp.getServers().size() <= servers.size() && + srcGrp.getTables().size() > 0) { + throw new ConstraintException("Cannot leave a group "+srcGrp.getName()+ + " that contains tables " +"without servers."); + } + + String sourceGroupName = getRSGroupInfoManager() + .getRSGroupOfServer(srcGrp.getServers().iterator().next()).getName(); + if(getRSGroupInfo(targetGroupName) == null) { + throw new ConstraintException("Target group does not exist: "+targetGroupName); + } + + for(HostAndPort server: servers) { + if (serversInTransition.containsKey(server)) { + throw new ConstraintException( + "Server list contains a server that is already being moved: "+server); + } + String tmpGroup = getRSGroupInfoManager().getRSGroupOfServer(server).getName(); + if (sourceGroupName != null && !tmpGroup.equals(sourceGroupName)) { + throw new ConstraintException( + "Move server request should only come from one source group. "+ + "Expecting only "+sourceGroupName+" but contains "+tmpGroup); + } + } + + if(sourceGroupName.equals(targetGroupName)) { + throw new ConstraintException( + "Target group is the same as source group: "+targetGroupName); + } + + try { + //update the servers as in transition + for (HostAndPort server : servers) { + serversInTransition.put(server, targetGroupName); + } + + getRSGroupInfoManager().moveServers(servers, sourceGroupName, targetGroupName); + boolean found; + List<HostAndPort> tmpServers = Lists.newArrayList(servers); + do { + found = false; + for (Iterator<HostAndPort> iter = tmpServers.iterator(); + iter.hasNext(); ) { + HostAndPort rs = iter.next(); + //get online regions + List<HRegionInfo> regions = new LinkedList<HRegionInfo>(); + for (Map.Entry<HRegionInfo, ServerName> el : + master.getAssignmentManager().getRegionStates().getRegionAssignments().entrySet()) { + if (el.getValue().getHostPort().equals(rs)) { + regions.add(el.getKey()); + } + } + Iterator<RegionState> i = + master.getAssignmentManager().getRegionStates().getRegionsInTransition().iterator(); + while (i.hasNext()) { + RegionState state = i.next(); + if (state.getServerName().getHostPort().equals(rs)) { + regions.add(state.getRegion()); + } + } + + //unassign regions for a server + LOG.info("Unassigning " + regions.size() + + " regions from server " + rs + " for move to " + targetGroupName); + if (regions.size() > 0) { + //TODO bulk unassign or throttled unassign? + for (HRegionInfo region : regions) { + //regions might get assigned from tables of target group + //so we need to filter + if (!targetGrp.containsTable(region.getTable())) { + master.getAssignmentManager().unassign(region); + found = true; + } + } + } + if (!found) { + iter.remove(); + } + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.warn("Sleep interrupted", e); + Thread.currentThread().interrupt(); + } + } while (found); + } finally { + //remove from transition + for (HostAndPort server : servers) { + serversInTransition.remove(server); + } + } + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postMoveServers(servers, targetGroupName); + } + LOG.info("Move server done: "+sourceGroupName+"->"+targetGroupName); + } + } + + @Override + public void moveTables(Set<TableName> tables, String targetGroup) throws IOException { + if (tables == null) { + throw new ConstraintException( + "The list of servers cannot be null."); + } + if(tables.size() < 1) { + LOG.debug("moveTables() passed an empty set. Ignoring."); + return; + } + RSGroupInfoManager manager = getRSGroupInfoManager(); + synchronized (manager) { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preMoveTables(tables, targetGroup); + } + + if(targetGroup != null) { + RSGroupInfo destGroup = manager.getRSGroup(targetGroup); + if(destGroup == null) { + throw new ConstraintException("Target group does not exist: "+targetGroup); + } + if(destGroup.getServers().size() < 1) { + throw new ConstraintException("Target group must have at least one server."); + } + } + + for(TableName table : tables) { + String srcGroup = manager.getRSGroupOfTable(table); + if(srcGroup != null && srcGroup.equals(targetGroup)) { + throw new ConstraintException( + "Source group is the same as target group for table "+table+" :"+srcGroup); + } + } + manager.moveTables(tables, targetGroup); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postMoveTables(tables, targetGroup); + } + } + for(TableName table: tables) { + TableLock lock = master.getTableLockManager().writeLock(table, "Group: table move"); + try { + lock.acquire(); + for (HRegionInfo region : + master.getAssignmentManager().getRegionStates().getRegionsOfTable(table)) { + master.getAssignmentManager().unassign(region); + } + } finally { + lock.release(); + } + } + } + + @Override + public void addRSGroup(String name) throws IOException { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preAddRSGroup(name); + } + getRSGroupInfoManager().addRSGroup(new RSGroupInfo(name)); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postAddRSGroup(name); + } + } + + @Override + public void removeRSGroup(String name) throws IOException { + RSGroupInfoManager manager = getRSGroupInfoManager(); + synchronized (manager) { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preRemoveRSGroup(name); + } + RSGroupInfo RSGroupInfo = getRSGroupInfoManager().getRSGroup(name); + if(RSGroupInfo == null) { + throw new ConstraintException("Group "+name+" does not exist"); + } + int tableCount = RSGroupInfo.getTables().size(); + if (tableCount > 0) { + throw new ConstraintException("Group "+name+" must have no associated tables: "+tableCount); + } + int serverCount = RSGroupInfo.getServers().size(); + if(serverCount > 0) { + throw new ConstraintException( + "Group "+name+" must have no associated servers: "+serverCount); + } + for(NamespaceDescriptor ns: master.getTableNamespaceManager().list()) { + String nsGroup = ns.getConfigurationValue(RSGroupInfo.NAMESPACEDESC_PROP_GROUP); + if(nsGroup != null && nsGroup.equals(name)) { + throw new ConstraintException("Group "+name+" is referenced by namespace: "+ns.getName()); + } + } + manager.removeRSGroup(name); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postRemoveRSGroup(name); + } + } + } + + @Override + public boolean balanceRSGroup(String groupName) throws IOException { + ServerManager serverManager = master.getServerManager(); + AssignmentManager assignmentManager = master.getAssignmentManager(); + LoadBalancer balancer = master.getLoadBalancer(); + + boolean balancerRan; + synchronized (balancer) { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preBalanceRSGroup(groupName); + } + if (getRSGroupInfo(groupName) == null) { + throw new ConstraintException("Group does not exist: "+groupName); + } + // Only allow one balance run at at time. + Map<String, RegionState> groupRIT = rsGroupGetRegionsInTransition(groupName); + if (groupRIT.size() > 0) { + LOG.debug("Not running balancer because " + + groupRIT.size() + + " region(s) in transition: " + + StringUtils.abbreviate( + master.getAssignmentManager().getRegionStates().getRegionsInTransition().toString(), + 256)); + return false; + } + if (serverManager.areDeadServersInProgress()) { + LOG.debug("Not running balancer because processing dead regionserver(s): " + + serverManager.getDeadServers()); + return false; + } + + //We balance per group instead of per table + List<RegionPlan> plans = new ArrayList<RegionPlan>(); + for(Map.Entry<TableName, Map<ServerName, List<HRegionInfo>>> tableMap: + getRSGroupAssignmentsByTable(groupName).entrySet()) { + LOG.info("Creating partial plan for table "+tableMap.getKey()+": "+tableMap.getValue()); + List<RegionPlan> partialPlans = balancer.balanceCluster(tableMap.getValue()); + LOG.info("Partial plan for table "+tableMap.getKey()+": "+partialPlans); + if (partialPlans != null) { + plans.addAll(partialPlans); + } + } + long startTime = System.currentTimeMillis(); + balancerRan = plans != null; + if (plans != null && !plans.isEmpty()) { + LOG.info("Group balance "+groupName+" starting with plan count: "+plans.size()); + for (RegionPlan plan: plans) { + LOG.info("balance " + plan); + assignmentManager.balance(plan); + } + LOG.info("Group balance "+groupName+" completed after "+ + (System.currentTimeMillis()-startTime)+" seconds"); + } + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postBalanceRSGroup(groupName, balancerRan); + } + } + return balancerRan; + } + + @Override + public List<RSGroupInfo> listRSGroups() throws IOException { + return getRSGroupInfoManager().listRSGroups(); + } + + @Override + public RSGroupInfo getRSGroupOfServer(HostAndPort hostPort) throws IOException { + return getRSGroupInfoManager().getRSGroupOfServer(hostPort); + } + + @InterfaceAudience.Private + public RSGroupInfoManager getRSGroupInfoManager() throws IOException { + return RSGroupInfoManager; + } + + private Map<String, RegionState> rsGroupGetRegionsInTransition(String groupName) + throws IOException { + Map<String, RegionState> rit = Maps.newTreeMap(); + AssignmentManager am = master.getAssignmentManager(); + RSGroupInfo RSGroupInfo = getRSGroupInfo(groupName); + for(TableName tableName : RSGroupInfo.getTables()) { + for(HRegionInfo regionInfo: am.getRegionStates().getRegionsOfTable(tableName)) { + RegionState state = + master.getAssignmentManager().getRegionStates().getRegionTransitionState(regionInfo); + if(state != null) { + rit.put(regionInfo.getEncodedName(), state); + } + } + } + return rit; + } + + private Map<TableName, Map<ServerName, List<HRegionInfo>>> + getRSGroupAssignmentsByTable(String groupName) throws IOException { + Map<TableName, Map<ServerName, List<HRegionInfo>>> result = Maps.newHashMap(); + RSGroupInfo RSGroupInfo = getRSGroupInfo(groupName); + Map<TableName, Map<ServerName, List<HRegionInfo>>> assignments = Maps.newHashMap(); + for(Map.Entry<HRegionInfo, ServerName> entry: + master.getAssignmentManager().getRegionStates().getRegionAssignments().entrySet()) { + TableName currTable = entry.getKey().getTable(); + ServerName currServer = entry.getValue(); + HRegionInfo currRegion = entry.getKey(); + if(RSGroupInfo.getTables().contains(currTable)) { + if(!assignments.containsKey(entry.getKey().getTable())) { + assignments.put(currTable, new HashMap<ServerName, List<HRegionInfo>>()); + } + if(!assignments.get(currTable).containsKey(currServer)) { + assignments.get(currTable).put(currServer, new ArrayList<HRegionInfo>()); + } + assignments.get(currTable).get(currServer).add(currRegion); + } + } + + Map<ServerName, List<HRegionInfo>> serverMap = Maps.newHashMap(); + for(ServerName serverName: master.getServerManager().getOnlineServers().keySet()) { + if(RSGroupInfo.getServers().contains(serverName.getHostPort())) { + serverMap.put(serverName, Collections.<HRegionInfo> emptyList()); + } + } + + //add all tables that are members of the group + for(TableName tableName : RSGroupInfo.getTables()) { + if(assignments.containsKey(tableName)) { + result.put(tableName, new HashMap<ServerName, List<HRegionInfo>>()); + result.get(tableName).putAll(serverMap); + result.get(tableName).putAll(assignments.get(tableName)); + LOG.debug("Adding assignments for "+tableName+": "+assignments.get(tableName)); + } + } + + return result; + } + + public void prepareRSGroupForTable(HTableDescriptor desc) throws IOException { + String groupName = + master.getTableNamespaceManager().get(desc.getTableName().getNamespaceAsString()) + .getConfigurationValue(RSGroupInfo.NAMESPACEDESC_PROP_GROUP); + if (groupName == null) { + groupName = RSGroupInfo.DEFAULT_GROUP; + } + RSGroupInfo RSGroupInfo = getRSGroupInfo(groupName); + if (RSGroupInfo == null) { + throw new ConstraintException("RSGroup " + groupName + " does not exist."); + } + if (!RSGroupInfo.containsTable(desc.getTableName())) { + LOG.debug("Pre-moving table " + desc.getTableName() + " to rsgroup " + groupName); + moveTables(Sets.newHashSet(desc.getTableName()), groupName); + } + } + + public void cleanupRSGroupForTable(TableName tableName) throws IOException { + try { + RSGroupInfo group = getRSGroupInfoOfTable(tableName); + if (group != null) { + LOG.debug("Removing deleted table from table rsgroup " + group.getName()); + moveTables(Sets.newHashSet(tableName), null); + } + } catch (ConstraintException ex) { + LOG.debug("Failed to perform rsgroup information cleanup for table: " + tableName, ex); + } catch (IOException ex) { + LOG.debug("Failed to perform rsgroup information cleanup for table: " + tableName, ex); + } + } + + @Override + public void close() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/52f05079/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java new file mode 100644 index 0000000..fea1275 --- /dev/null +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java @@ -0,0 +1,428 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rsgroup; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.net.HostAndPort; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.constraint.ConstraintException; +import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721) + * It does region balance based on a table's group membership. + * + * Most assignment methods contain two exclusive code paths: Online - when the group + * table is online and Offline - when it is unavailable. + * + * During Offline, assignments are assigned based on cached information in zookeeper. + * If unavailable (ie bootstrap) then regions are assigned randomly. + * + * Once the GROUP table has been assigned, the balancer switches to Online and will then + * start providing appropriate assignments for user tables. + * + */ +@InterfaceAudience.Private +public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalancer { + /** Config for pluggable load balancers */ + public static final String HBASE_GROUP_LOADBALANCER_CLASS = "hbase.group.grouploadbalancer.class"; + + private static final Log LOG = LogFactory.getLog(RSGroupBasedLoadBalancer.class); + + private Configuration config; + private ClusterStatus clusterStatus; + private MasterServices masterServices; + private RSGroupInfoManager RSGroupInfoManager; + private LoadBalancer internalBalancer; + + //used during reflection by LoadBalancerFactory + @InterfaceAudience.Private + public RSGroupBasedLoadBalancer() { + } + + //This constructor should only be used for unit testing + @InterfaceAudience.Private + public RSGroupBasedLoadBalancer(RSGroupInfoManager RSGroupInfoManager) { + this.RSGroupInfoManager = RSGroupInfoManager; + } + + @Override + public Configuration getConf() { + return config; + } + + @Override + public void setConf(Configuration conf) { + this.config = conf; + } + + @Override + public void setClusterStatus(ClusterStatus st) { + this.clusterStatus = st; + } + + @Override + public void setMasterServices(MasterServices masterServices) { + this.masterServices = masterServices; + } + + @Override + public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<HRegionInfo>> + clusterState) throws HBaseIOException { + return balanceCluster(clusterState); + } + + @Override + public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) + throws HBaseIOException { + if (!isOnline()) { + throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME + + " is not online, unable to perform balance"); + } + + Map<ServerName,List<HRegionInfo>> correctedState = correctAssignments(clusterState); + List<RegionPlan> regionPlans = new ArrayList<RegionPlan>(); + + List<HRegionInfo> misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME); + for (HRegionInfo regionInfo : misplacedRegions) { + regionPlans.add(new RegionPlan(regionInfo, null, null)); + } + try { + for (RSGroupInfo info : RSGroupInfoManager.listRSGroups()) { + Map<ServerName, List<HRegionInfo>> groupClusterState = + new HashMap<ServerName, List<HRegionInfo>>(); + for (HostAndPort sName : info.getServers()) { + for(ServerName curr: clusterState.keySet()) { + if(curr.getHostPort().equals(sName)) { + groupClusterState.put(curr, correctedState.get(curr)); + } + } + } + List<RegionPlan> groupPlans = this.internalBalancer + .balanceCluster(groupClusterState); + if (groupPlans != null) { + regionPlans.addAll(groupPlans); + } + } + } catch (IOException exp) { + LOG.warn("Exception while balancing cluster.", exp); + regionPlans.clear(); + } + return regionPlans; + } + + @Override + public Map<ServerName, List<HRegionInfo>> roundRobinAssignment( + List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException { + Map<ServerName, List<HRegionInfo>> assignments = Maps.newHashMap(); + ListMultimap<String,HRegionInfo> regionMap = ArrayListMultimap.create(); + ListMultimap<String,ServerName> serverMap = ArrayListMultimap.create(); + generateGroupMaps(regions, servers, regionMap, serverMap); + for(String groupKey : regionMap.keySet()) { + if (regionMap.get(groupKey).size() > 0) { + Map<ServerName, List<HRegionInfo>> result = + this.internalBalancer.roundRobinAssignment( + regionMap.get(groupKey), + serverMap.get(groupKey)); + if(result != null) { + assignments.putAll(result); + } + } + } + return assignments; + } + + @Override + public Map<ServerName, List<HRegionInfo>> retainAssignment( + Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException { + try { + Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>(); + ListMultimap<String, HRegionInfo> groupToRegion = ArrayListMultimap.create(); + Set<HRegionInfo> misplacedRegions = getMisplacedRegions(regions); + for (HRegionInfo region : regions.keySet()) { + if (!misplacedRegions.contains(region)) { + String groupName = RSGroupInfoManager.getRSGroupOfTable(region.getTable()); + groupToRegion.put(groupName, region); + } + } + // Now the "groupToRegion" map has only the regions which have correct + // assignments. + for (String key : groupToRegion.keySet()) { + Map<HRegionInfo, ServerName> currentAssignmentMap = new TreeMap<HRegionInfo, ServerName>(); + List<HRegionInfo> regionList = groupToRegion.get(key); + RSGroupInfo info = RSGroupInfoManager.getRSGroup(key); + List<ServerName> candidateList = filterOfflineServers(info, servers); + for (HRegionInfo region : regionList) { + currentAssignmentMap.put(region, regions.get(region)); + } + if(candidateList.size() > 0) { + assignments.putAll(this.internalBalancer.retainAssignment( + currentAssignmentMap, candidateList)); + } + } + + for (HRegionInfo region : misplacedRegions) { + String groupName = RSGroupInfoManager.getRSGroupOfTable( + region.getTable()); + RSGroupInfo info = RSGroupInfoManager.getRSGroup(groupName); + List<ServerName> candidateList = filterOfflineServers(info, servers); + ServerName server = this.internalBalancer.randomAssignment(region, + candidateList); + if (server != null && !assignments.containsKey(server)) { + assignments.put(server, new ArrayList<HRegionInfo>()); + } else if (server != null) { + assignments.get(server).add(region); + } else { + //if not server is available assign to bogus so it ends up in RIT + if(!assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) { + assignments.put(LoadBalancer.BOGUS_SERVER_NAME, new ArrayList<HRegionInfo>()); + } + assignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region); + } + } + return assignments; + } catch (IOException e) { + throw new HBaseIOException("Failed to do online retain assignment", e); + } + } + + @Override + public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> regions, + List<ServerName> servers) throws HBaseIOException { + throw new UnsupportedOperationException("immediateAssignment is not supported"); + } + + @Override + public ServerName randomAssignment(HRegionInfo region, + List<ServerName> servers) throws HBaseIOException { + ListMultimap<String,HRegionInfo> regionMap = LinkedListMultimap.create(); + ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create(); + generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap); + List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next()); + return this.internalBalancer.randomAssignment(region, filteredServers); + } + + private void generateGroupMaps( + List<HRegionInfo> regions, + List<ServerName> servers, + ListMultimap<String, HRegionInfo> regionMap, + ListMultimap<String, ServerName> serverMap) throws HBaseIOException { + try { + for (HRegionInfo region : regions) { + String groupName = RSGroupInfoManager.getRSGroupOfTable(region.getTable()); + if(groupName == null) { + LOG.warn("Group for table "+region.getTable()+" is null"); + } + regionMap.put(groupName, region); + } + for (String groupKey : regionMap.keySet()) { + RSGroupInfo info = RSGroupInfoManager.getRSGroup(groupKey); + serverMap.putAll(groupKey, filterOfflineServers(info, servers)); + if(serverMap.get(groupKey).size() < 1) { + serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME); + } + } + } catch(IOException e) { + throw new HBaseIOException("Failed to generate group maps", e); + } + } + + private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo, + List<ServerName> onlineServers) { + if (RSGroupInfo != null) { + return filterServers(RSGroupInfo.getServers(), onlineServers); + } else { + LOG.debug("Group Information found to be null. Some regions might be unassigned."); + return Collections.EMPTY_LIST; + } + } + + /** + * Filter servers based on the online servers. + * + * @param servers + * the servers + * @param onlineServers + * List of servers which are online. + * @return the list + */ + private List<ServerName> filterServers(Collection<HostAndPort> servers, + Collection<ServerName> onlineServers) { + ArrayList<ServerName> finalList = new ArrayList<ServerName>(); + for (HostAndPort server : servers) { + for(ServerName curr: onlineServers) { + if(curr.getHostPort().equals(server)) { + finalList.add(curr); + } + } + } + return finalList; + } + + private ListMultimap<String, HRegionInfo> groupRegions( + List<HRegionInfo> regionList) throws IOException { + ListMultimap<String, HRegionInfo> regionGroup = ArrayListMultimap + .create(); + for (HRegionInfo region : regionList) { + String groupName = RSGroupInfoManager.getRSGroupOfTable(region.getTable()); + regionGroup.put(groupName, region); + } + return regionGroup; + } + + private Set<HRegionInfo> getMisplacedRegions( + Map<HRegionInfo, ServerName> regions) throws IOException { + Set<HRegionInfo> misplacedRegions = new HashSet<HRegionInfo>(); + for (HRegionInfo region : regions.keySet()) { + ServerName assignedServer = regions.get(region); + RSGroupInfo info = + RSGroupInfoManager.getRSGroup(RSGroupInfoManager.getRSGroupOfTable(region.getTable())); + if (assignedServer != null && + (info == null || !info.containsServer(assignedServer.getHostPort()))) { + LOG.debug("Found misplaced region: " + region.getRegionNameAsString() + + " on server: " + assignedServer + + " found in group: " + + RSGroupInfoManager.getRSGroupOfServer(assignedServer.getHostPort()) + + " outside of group: " + info.getName()); + misplacedRegions.add(region); + } + } + return misplacedRegions; + } + + private Map<ServerName, List<HRegionInfo>> correctAssignments( + Map<ServerName, List<HRegionInfo>> existingAssignments){ + Map<ServerName, List<HRegionInfo>> correctAssignments = + new TreeMap<ServerName, List<HRegionInfo>>(); + List<HRegionInfo> misplacedRegions = new LinkedList<HRegionInfo>(); + correctAssignments.put(LoadBalancer.BOGUS_SERVER_NAME, new LinkedList<HRegionInfo>()); + for (ServerName sName : existingAssignments.keySet()) { + correctAssignments.put(sName, new LinkedList<HRegionInfo>()); + List<HRegionInfo> regions = existingAssignments.get(sName); + for (HRegionInfo region : regions) { + RSGroupInfo info = null; + try { + info = RSGroupInfoManager.getRSGroup( + RSGroupInfoManager.getRSGroupOfTable(region.getTable())); + }catch(IOException exp){ + LOG.debug("Group information null for region of table " + region.getTable(), + exp); + } + if ((info == null) || (!info.containsServer(sName.getHostPort()))) { + correctAssignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region); + } else { + correctAssignments.get(sName).add(region); + } + } + } + + //TODO bulk unassign? + //unassign misplaced regions, so that they are assigned to correct groups. + for(HRegionInfo info: misplacedRegions) { + this.masterServices.getAssignmentManager().unassign(info); + } + return correctAssignments; + } + + @Override + public void initialize() throws HBaseIOException { + try { + if (RSGroupInfoManager == null) { + List<RSGroupAdminEndpoint> cps = + masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class); + if (cps.size() != 1) { + String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size(); + LOG.error(msg); + throw new HBaseIOException(msg); + } + RSGroupInfoManager = cps.get(0).getGroupInfoManager(); + } + } catch (IOException e) { + throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e); + } + + // Create the balancer + Class<? extends LoadBalancer> balancerKlass = config.getClass( + HBASE_GROUP_LOADBALANCER_CLASS, + StochasticLoadBalancer.class, LoadBalancer.class); + internalBalancer = ReflectionUtils.newInstance(balancerKlass, config); + internalBalancer.setClusterStatus(clusterStatus); + internalBalancer.setMasterServices(masterServices); + internalBalancer.setConf(config); + internalBalancer.initialize(); + } + + public boolean isOnline() { + return RSGroupInfoManager != null && RSGroupInfoManager.isOnline(); + } + + @Override + public void regionOnline(HRegionInfo regionInfo, ServerName sn) { + } + + @Override + public void regionOffline(HRegionInfo regionInfo) { + } + + @Override + public void onConfigurationChange(Configuration conf) { + //DO nothing for now + } + + @Override + public void stop(String why) { + } + + @Override + public boolean isStopped() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/52f05079/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java new file mode 100644 index 0000000..434c85f --- /dev/null +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java @@ -0,0 +1,132 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rsgroup; + +import com.google.common.net.HostAndPort; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Interface used to manage RSGroupInfo storage. An implementation + * has the option to support offline mode. + * See {@link RSGroupBasedLoadBalancer} + */ +public interface RSGroupInfoManager { + //Assigned before user tables + public static final TableName RSGROUP_TABLE_NAME = + TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup"); + public static final byte[] RSGROUP_TABLE_NAME_BYTES = RSGROUP_TABLE_NAME.toBytes(); + public static final String rsGroupZNode = "rsgroup"; + public static final byte[] META_FAMILY_BYTES = Bytes.toBytes("m"); + public static final byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i"); + public static final byte[] ROW_KEY = {0}; + + + /** + * Adds the group. + * + * @param rsGroupInfo the group name + * @throws java.io.IOException Signals that an I/O exception has occurred. + */ + void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException; + + /** + * Remove a region server group. + * + * @param groupName the group name + * @throws java.io.IOException Signals that an I/O exception has occurred. + */ + void removeRSGroup(String groupName) throws IOException; + + /** + * move servers to a new group. + * @param hostPorts list of servers, must be part of the same group + * @param srcGroup groupName being moved from + * @param dstGroup groupName being moved to + * @return true if move was successful + * @throws java.io.IOException on move failure + */ + boolean moveServers(Set<HostAndPort> hostPorts, + String srcGroup, String dstGroup) throws IOException; + + /** + * Gets the group info of server. + * + * @param hostPort the server + * @return An instance of RSGroupInfo + */ + RSGroupInfo getRSGroupOfServer(HostAndPort hostPort) throws IOException; + + /** + * Gets the group information. + * + * @param groupName the group name + * @return An instance of RSGroupInfo + */ + RSGroupInfo getRSGroup(String groupName) throws IOException; + + /** + * Get the group membership of a table + * @param tableName name of table to get group membership + * @return Group name of table + * @throws java.io.IOException on failure to retrive information + */ + String getRSGroupOfTable(TableName tableName) throws IOException; + + /** + * Set the group membership of a set of tables + * + * @param tableNames set of tables to move + * @param groupName name of group of tables to move to + * @throws java.io.IOException on failure to move + */ + void moveTables(Set<TableName> tableNames, String groupName) throws IOException; + + /** + * List the groups + * + * @return list of RSGroupInfo + * @throws java.io.IOException on failure + */ + List<RSGroupInfo> listRSGroups() throws IOException; + + /** + * Refresh/reload the group information from + * the persistent store + * + * @throws java.io.IOException on failure to refresh + */ + void refresh() throws IOException; + + /** + * Whether the manager is able to fully + * return group metadata + * + * @return whether the manager is in online mode + */ + boolean isOnline(); +}