http://git-wip-us.apache.org/repos/asf/hbase/blob/a860e48a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java new file mode 100644 index 0000000..7fcb7c7 --- /dev/null +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java @@ -0,0 +1,758 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rsgroup; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import com.google.common.collect.Sets; +import com.google.common.net.HostAndPort; +import com.google.protobuf.ServiceException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableStateManager; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.constraint.ConstraintException; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.ServerListener; +import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; +import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; +import org.apache.hadoop.hbase.security.access.AccessControlLists; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ModifyRegionUtils; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * This is an implementation of {@link RSGroupInfoManager}. Which makes + * use of an HBase table as the persistence store for the group information. + * It also makes use of zookeeper to store group information needed + * for bootstrapping during offline mode. + */ +public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListener { + private static final Log LOG = LogFactory.getLog(RSGroupInfoManagerImpl.class); + + /** Table descriptor for <code>hbase:rsgroup</code> catalog table */ + private final static HTableDescriptor RSGROUP_TABLE_DESC; + static { + RSGROUP_TABLE_DESC = new HTableDescriptor(RSGROUP_TABLE_NAME_BYTES); + RSGROUP_TABLE_DESC.addFamily(new HColumnDescriptor(META_FAMILY_BYTES)); + RSGROUP_TABLE_DESC.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName()); + try { + RSGROUP_TABLE_DESC.addCoprocessor( + MultiRowMutationEndpoint.class.getName(), + null, Coprocessor.PRIORITY_SYSTEM, null); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + private volatile Map<String, RSGroupInfo> rsGroupMap; + private volatile Map<TableName, String> tableMap; + private MasterServices master; + private Table rsGroupTable; + private ClusterConnection conn; + private ZooKeeperWatcher watcher; + private RSGroupStartupWorker rsGroupStartupWorker; + // contains list of groups that were last flushed to persistent store + private volatile Set<String> prevRSGroups; + private RSGroupSerDe rsGroupSerDe; + private DefaultServerUpdater defaultServerUpdater; + + + public RSGroupInfoManagerImpl(MasterServices master) throws IOException { + this.rsGroupMap = Collections.emptyMap(); + this.tableMap = Collections.emptyMap(); + rsGroupSerDe = new RSGroupSerDe(); + this.master = master; + this.watcher = master.getZooKeeper(); + this.conn = master.getConnection(); + rsGroupStartupWorker = new RSGroupStartupWorker(this, master, conn); + prevRSGroups = new HashSet<String>(); + refresh(); + rsGroupStartupWorker.start(); + defaultServerUpdater = new DefaultServerUpdater(this); + master.getServerManager().registerListener(this); + defaultServerUpdater.start(); + } + + /** + * Adds the group. + * + * @param rsGroupInfo the group name + */ + @Override + public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException { + checkGroupName(rsGroupInfo.getName()); + if (rsGroupMap.get(rsGroupInfo.getName()) != null || + rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { + throw new DoNotRetryIOException("Group already exists: "+ rsGroupInfo.getName()); + } + Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); + newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo); + flushConfig(newGroupMap); + } + + @Override + public synchronized boolean moveServers(Set<HostAndPort> hostPorts, String srcGroup, + String dstGroup) throws IOException { + if (!rsGroupMap.containsKey(srcGroup)) { + throw new DoNotRetryIOException("Group "+srcGroup+" does not exist"); + } + if (!rsGroupMap.containsKey(dstGroup)) { + throw new DoNotRetryIOException("Group "+dstGroup+" does not exist"); + } + + RSGroupInfo src = new RSGroupInfo(getRSGroup(srcGroup)); + RSGroupInfo dst = new RSGroupInfo(getRSGroup(dstGroup)); + boolean foundOne = false; + for(HostAndPort el: hostPorts) { + foundOne = src.removeServer(el) || foundOne; + dst.addServer(el); + } + + Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); + newGroupMap.put(src.getName(), src); + newGroupMap.put(dst.getName(), dst); + + flushConfig(newGroupMap); + return foundOne; + } + + /** + * Gets the group info of server. + * + * @param hostPort the server + * @return An instance of GroupInfo. + */ + @Override + public RSGroupInfo getRSGroupOfServer(HostAndPort hostPort) throws IOException { + for (RSGroupInfo info : rsGroupMap.values()) { + if (info.containsServer(hostPort)){ + return info; + } + } + return null; + } + + /** + * Gets the group information. + * + * @param groupName + * the group name + * @return An instance of GroupInfo + */ + @Override + public RSGroupInfo getRSGroup(String groupName) throws IOException { + RSGroupInfo RSGroupInfo = rsGroupMap.get(groupName); + return RSGroupInfo; + } + + + + @Override + public String getRSGroupOfTable(TableName tableName) throws IOException { + return tableMap.get(tableName); + } + + @Override + public synchronized void moveTables( + Set<TableName> tableNames, String groupName) throws IOException { + if (groupName != null && !rsGroupMap.containsKey(groupName)) { + throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a special group"); + } + + Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); + for(TableName tableName: tableNames) { + if (tableMap.containsKey(tableName)) { + RSGroupInfo src = new RSGroupInfo(rsGroupMap.get(tableMap.get(tableName))); + src.removeTable(tableName); + newGroupMap.put(src.getName(), src); + } + if(groupName != null) { + RSGroupInfo dst = new RSGroupInfo(newGroupMap.get(groupName)); + dst.addTable(tableName); + newGroupMap.put(dst.getName(), dst); + } + } + + flushConfig(newGroupMap); + } + + + /** + * Delete a region server group. + * + * @param groupName the group name + * @throws java.io.IOException Signals that an I/O exception has occurred. + */ + @Override + public synchronized void removeRSGroup(String groupName) throws IOException { + if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { + throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a reserved group"); + } + Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); + newGroupMap.remove(groupName); + flushConfig(newGroupMap); + } + + @Override + public List<RSGroupInfo> listRSGroups() throws IOException { + List<RSGroupInfo> list = Lists.newLinkedList(rsGroupMap.values()); + return list; + } + + @Override + public boolean isOnline() { + return rsGroupStartupWorker.isOnline(); + } + + @Override + public synchronized void refresh() throws IOException { + refresh(false); + } + + private synchronized void refresh(boolean forceOnline) throws IOException { + List<RSGroupInfo> groupList = new LinkedList<RSGroupInfo>(); + + // overwrite anything read from zk, group table is source of truth + // if online read from GROUP table + if (forceOnline || isOnline()) { + LOG.debug("Refreshing in Online mode."); + if (rsGroupTable == null) { + rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME); + } + groupList.addAll(rsGroupSerDe.retrieveGroupList(rsGroupTable)); + } else { + LOG.debug("Refershing in Offline mode."); + String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, rsGroupZNode); + groupList.addAll(rsGroupSerDe.retrieveGroupList(watcher, groupBasePath)); + } + + // refresh default group, prune + NavigableSet<TableName> orphanTables = new TreeSet<TableName>(); + for(String entry: master.getTableDescriptors().getAll().keySet()) { + orphanTables.add(TableName.valueOf(entry)); + } + + List<TableName> specialTables; + if(!master.isInitialized()) { + specialTables = new ArrayList<TableName>(); + specialTables.add(AccessControlLists.ACL_TABLE_NAME); + specialTables.add(TableName.META_TABLE_NAME); + specialTables.add(TableName.NAMESPACE_TABLE_NAME); + specialTables.add(RSGROUP_TABLE_NAME); + } else { + specialTables = + master.listTableNamesByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR); + } + + for(TableName table : specialTables) { + orphanTables.add(table); + } + for(RSGroupInfo group: groupList) { + if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { + orphanTables.removeAll(group.getTables()); + } + } + + // This is added to the last of the list + // so it overwrites the default group loaded + // from region group table or zk + groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, + Sets.newHashSet(getDefaultServers()), + orphanTables)); + + + // populate the data + HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(); + HashMap<TableName, String> newTableMap = Maps.newHashMap(); + for (RSGroupInfo group : groupList) { + newGroupMap.put(group.getName(), group); + for(TableName table: group.getTables()) { + newTableMap.put(table, group.getName()); + } + } + rsGroupMap = Collections.unmodifiableMap(newGroupMap); + tableMap = Collections.unmodifiableMap(newTableMap); + + prevRSGroups.clear(); + prevRSGroups.addAll(rsGroupMap.keySet()); + } + + private synchronized Map<TableName,String> flushConfigTable(Map<String,RSGroupInfo> newGroupMap) + throws IOException { + Map<TableName,String> newTableMap = Maps.newHashMap(); + List<Mutation> mutations = Lists.newArrayList(); + + // populate deletes + for(String groupName : prevRSGroups) { + if(!newGroupMap.containsKey(groupName)) { + Delete d = new Delete(Bytes.toBytes(groupName)); + mutations.add(d); + } + } + + // populate puts + for(RSGroupInfo RSGroupInfo : newGroupMap.values()) { + RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(RSGroupInfo); + Put p = new Put(Bytes.toBytes(RSGroupInfo.getName())); + p.addColumn(META_FAMILY_BYTES, + META_QUALIFIER_BYTES, + proto.toByteArray()); + mutations.add(p); + for(TableName entry: RSGroupInfo.getTables()) { + newTableMap.put(entry, RSGroupInfo.getName()); + } + } + + if(mutations.size() > 0) { + multiMutate(mutations); + } + return newTableMap; + } + + private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException { + Map<TableName, String> newTableMap; + + // For offline mode persistence is still unavailable + // We're refreshing in-memory state but only for default servers + if (!isOnline()) { + Map<String, RSGroupInfo> m = Maps.newHashMap(rsGroupMap); + RSGroupInfo oldDefaultGroup = m.remove(RSGroupInfo.DEFAULT_GROUP); + RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); + if (!m.equals(newGroupMap) || + !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) { + throw new IOException("Only default servers can be updated during offline mode"); + } + newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup); + rsGroupMap = newGroupMap; + return; + } + + newTableMap = flushConfigTable(newGroupMap); + + // make changes visible since it has been + // persisted in the source of truth + rsGroupMap = Collections.unmodifiableMap(newGroupMap); + tableMap = Collections.unmodifiableMap(newTableMap); + + + try { + String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, rsGroupZNode); + ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufUtil.PB_MAGIC); + + List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<ZKUtil.ZKUtilOp>(newGroupMap.size()); + for(String groupName : prevRSGroups) { + if(!newGroupMap.containsKey(groupName)) { + String znode = ZKUtil.joinZNode(groupBasePath, groupName); + zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); + } + } + + + for(RSGroupInfo RSGroupInfo : newGroupMap.values()) { + String znode = ZKUtil.joinZNode(groupBasePath, RSGroupInfo.getName()); + RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(RSGroupInfo); + LOG.debug("Updating znode: "+znode); + ZKUtil.createAndFailSilent(watcher, znode); + zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); + zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode, + ProtobufUtil.prependPBMagic(proto.toByteArray()))); + } + LOG.debug("Writing ZK GroupInfo count: " + zkOps.size()); + + ZKUtil.multiOrSequential(watcher, zkOps, false); + } catch (KeeperException e) { + LOG.error("Failed to write to rsGroupZNode", e); + master.abort("Failed to write to rsGroupZNode", e); + throw new IOException("Failed to write to rsGroupZNode",e); + } + + prevRSGroups.clear(); + prevRSGroups.addAll(newGroupMap.keySet()); + } + + private List<ServerName> getOnlineRS() throws IOException { + if (master != null) { + return master.getServerManager().getOnlineServersList(); + } + try { + LOG.debug("Reading online RS from zookeeper"); + List<ServerName> servers = new LinkedList<ServerName>(); + for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode)) { + servers.add(ServerName.parseServerName(el)); + } + return servers; + } catch (KeeperException e) { + throw new IOException("Failed to retrieve server list from zookeeper", e); + } + } + + private List<HostAndPort> getDefaultServers() throws IOException { + List<HostAndPort> defaultServers = new LinkedList<HostAndPort>(); + for(ServerName server : getOnlineRS()) { + HostAndPort hostPort = HostAndPort.fromParts(server.getHostname(), server.getPort()); + boolean found = false; + for(RSGroupInfo RSGroupInfo : rsGroupMap.values()) { + if(!RSGroupInfo.DEFAULT_GROUP.equals(RSGroupInfo.getName()) && + RSGroupInfo.containsServer(hostPort)) { + found = true; + break; + } + } + if(!found) { + defaultServers.add(hostPort); + } + } + return defaultServers; + } + + private synchronized void updateDefaultServers( + Set<HostAndPort> hostPort) throws IOException { + RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP); + RSGroupInfo newInfo = new RSGroupInfo(info.getName(), hostPort, info.getTables()); + HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); + newGroupMap.put(newInfo.getName(), newInfo); + flushConfig(newGroupMap); + } + + @Override + public void serverAdded(ServerName serverName) { + defaultServerUpdater.serverChanged(); + } + + @Override + public void serverRemoved(ServerName serverName) { + defaultServerUpdater.serverChanged(); + } + + private static class DefaultServerUpdater extends Thread { + private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class); + private RSGroupInfoManagerImpl mgr; + private boolean hasChanged = false; + + public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) { + this.mgr = mgr; + } + + @Override + public void run() { + List<HostAndPort> prevDefaultServers = new LinkedList<HostAndPort>(); + while(!mgr.master.isAborted() || !mgr.master.isStopped()) { + try { + LOG.info("Updating default servers."); + List<HostAndPort> servers = mgr.getDefaultServers(); + Collections.sort(servers, new Comparator<HostAndPort>() { + @Override + public int compare(HostAndPort o1, HostAndPort o2) { + int diff = o1.getHostText().compareTo(o2.getHostText()); + if (diff != 0) { + return diff; + } + return o1.getPort() - o2.getPort(); + } + }); + if(!servers.equals(prevDefaultServers)) { + mgr.updateDefaultServers(Sets.<HostAndPort>newHashSet(servers)); + prevDefaultServers = servers; + LOG.info("Updated with servers: "+servers.size()); + } + try { + synchronized (this) { + if(!hasChanged) { + wait(); + } + hasChanged = false; + } + } catch (InterruptedException e) { + } + } catch (IOException e) { + LOG.warn("Failed to update default servers", e); + } + } + } + + public void serverChanged() { + synchronized (this) { + hasChanged = true; + this.notify(); + } + } + } + + @Override + public void waiting() { + + } + + private static class RSGroupStartupWorker extends Thread { + private static final Log LOG = LogFactory.getLog(RSGroupStartupWorker.class); + + private Configuration conf; + private volatile boolean isOnline = false; + private MasterServices masterServices; + private RSGroupInfoManagerImpl groupInfoManager; + private ClusterConnection conn; + + public RSGroupStartupWorker(RSGroupInfoManagerImpl groupInfoManager, + MasterServices masterServices, + ClusterConnection conn) { + this.conf = masterServices.getConfiguration(); + this.masterServices = masterServices; + this.groupInfoManager = groupInfoManager; + this.conn = conn; + setName(RSGroupStartupWorker.class.getName()+"-"+masterServices.getServerName()); + setDaemon(true); + } + + @Override + public void run() { + if(waitForGroupTableOnline()) { + LOG.info("GroupBasedLoadBalancer is now online"); + } + } + + public boolean waitForGroupTableOnline() { + final List<HRegionInfo> foundRegions = new LinkedList<HRegionInfo>(); + final List<HRegionInfo> assignedRegions = new LinkedList<HRegionInfo>(); + final AtomicBoolean found = new AtomicBoolean(false); + final TableStateManager tsm = + masterServices.getAssignmentManager().getTableStateManager(); + boolean createSent = false; + while (!found.get() && isMasterRunning()) { + foundRegions.clear(); + assignedRegions.clear(); + found.set(true); + try { + final Table nsTable = conn.getTable(TableName.NAMESPACE_TABLE_NAME); + final Table groupTable = conn.getTable(RSGROUP_TABLE_NAME); + boolean rootMetaFound = + masterServices.getMetaTableLocator().verifyMetaRegionLocation( + conn, + masterServices.getZooKeeper(), + 1); + final AtomicBoolean nsFound = new AtomicBoolean(false); + if (rootMetaFound) { + + MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() { + @Override + public boolean visit(Result row) throws IOException { + + HRegionInfo info = MetaTableAccessor.getHRegionInfo(row); + if (info != null) { + Cell serverCell = + row.getColumnLatestCell(HConstants.CATALOG_FAMILY, + HConstants.SERVER_QUALIFIER); + if (RSGROUP_TABLE_NAME.equals(info.getTable()) && serverCell != null) { + ServerName sn = + ServerName.parseVersionedServerName(CellUtil.cloneValue(serverCell)); + if (sn == null) { + found.set(false); + } else if (tsm.isTableState(RSGROUP_TABLE_NAME, + ZooKeeperProtos.Table.State.ENABLED)) { + try { + ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn); + ClientProtos.GetRequest request = + RequestConverter.buildGetRequest(info.getRegionName(), + new Get(ROW_KEY)); + rs.get(null, request); + assignedRegions.add(info); + } catch(Exception ex) { + LOG.debug("Caught exception while verifying group region", ex); + } + } + foundRegions.add(info); + } + if (TableName.NAMESPACE_TABLE_NAME.equals(info.getTable())) { + Cell cell = row.getColumnLatestCell(HConstants.CATALOG_FAMILY, + HConstants.SERVER_QUALIFIER); + ServerName sn = null; + if(cell != null) { + sn = ServerName.parseVersionedServerName(CellUtil.cloneValue(cell)); + } + if (tsm.isTableState(TableName.NAMESPACE_TABLE_NAME, + ZooKeeperProtos.Table.State.ENABLED)) { + try { + ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn); + ClientProtos.GetRequest request = + RequestConverter.buildGetRequest(info.getRegionName(), + new Get(ROW_KEY)); + rs.get(null, request); + nsFound.set(true); + } catch(Exception ex) { + LOG.debug("Caught exception while verifying group region", ex); + } + } + } + } + return true; + } + }; + MetaTableAccessor.fullScan(conn, visitor); + // if no regions in meta then we have to create the table + if (foundRegions.size() < 1 && rootMetaFound && !createSent && nsFound.get()) { + groupInfoManager.createGroupTable(masterServices); + createSent = true; + } + LOG.info("Group table: " + RSGROUP_TABLE_NAME + " isOnline: " + found.get() + + ", regionCount: " + foundRegions.size() + ", assignCount: " + + assignedRegions.size() + ", rootMetaFound: "+rootMetaFound); + found.set(found.get() && assignedRegions.size() == foundRegions.size() + && foundRegions.size() > 0); + } else { + LOG.info("Waiting for catalog tables to come online"); + found.set(false); + } + if (found.get()) { + LOG.debug("With group table online, refreshing cached information."); + groupInfoManager.refresh(true); + isOnline = true; + //flush any inconsistencies between ZK and HTable + groupInfoManager.flushConfig(groupInfoManager.rsGroupMap); + } + } catch(Exception e) { + found.set(false); + LOG.warn("Failed to perform check", e); + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted", e); + } + } + return found.get(); + } + + public boolean isOnline() { + return isOnline; + } + + private boolean isMasterRunning() { + return !masterServices.isAborted() && !masterServices.isStopped(); + } + } + + private void createGroupTable(MasterServices masterServices) throws IOException { + HRegionInfo[] newRegions = + ModifyRegionUtils.createHRegionInfos(RSGROUP_TABLE_DESC, null); + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(); + masterServices.getMasterProcedureExecutor().submitProcedure( + new CreateTableProcedure( + masterServices.getMasterProcedureExecutor().getEnvironment(), + RSGROUP_TABLE_DESC, + newRegions, + latch)); + latch.await(); + // wait for region to be online + int tries = 600; + while(masterServices.getAssignmentManager().getRegionStates() + .getRegionServerOfRegion(newRegions[0]) == null && tries > 0) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new IOException("Wait interrupted", e); + } + tries--; + } + if(tries <= 0) { + throw new IOException("Failed to create group table."); + } + } + + private void multiMutate(List<Mutation> mutations) + throws IOException { + CoprocessorRpcChannel channel = rsGroupTable.coprocessorService(ROW_KEY); + MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder + = MultiRowMutationProtos.MutateRowsRequest.newBuilder(); + for (Mutation mutation : mutations) { + if (mutation instanceof Put) { + mmrBuilder.addMutationRequest(ProtobufUtil.toMutation( + ClientProtos.MutationProto.MutationType.PUT, mutation)); + } else if (mutation instanceof Delete) { + mmrBuilder.addMutationRequest(ProtobufUtil.toMutation( + ClientProtos.MutationProto.MutationType.DELETE, mutation)); + } else { + throw new DoNotRetryIOException("multiMutate doesn't support " + + mutation.getClass().getName()); + } + } + + MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service = + MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel); + try { + service.mutateRows(null, mmrBuilder.build()); + } catch (ServiceException ex) { + ProtobufUtil.toIOException(ex); + } + } + + private void checkGroupName(String groupName) throws ConstraintException { + if(!groupName.matches("[a-zA-Z0-9_]+")) { + throw new ConstraintException("Group name should only contain alphanumeric characters"); + } + } + +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a860e48a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java new file mode 100644 index 0000000..530db58 --- /dev/null +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java @@ -0,0 +1,88 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rsgroup; + +import com.google.common.collect.Lists; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +//TODO do better encapsulation of SerDe logic from GroupInfoManager and GroupTracker +public class RSGroupSerDe { + private static final Log LOG = LogFactory.getLog(RSGroupSerDe.class); + + public RSGroupSerDe() { + + } + + public List<RSGroupInfo> retrieveGroupList(Table groupTable) throws IOException { + List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList(); + for (Result result : groupTable.getScanner(new Scan())) { + RSGroupProtos.RSGroupInfo proto = + RSGroupProtos.RSGroupInfo.parseFrom( + result.getValue( + RSGroupInfoManager.META_FAMILY_BYTES, + RSGroupInfoManager.META_QUALIFIER_BYTES)); + RSGroupInfoList.add(ProtobufUtil.toGroupInfo(proto)); + } + return RSGroupInfoList; + } + + public List<RSGroupInfo> retrieveGroupList(ZooKeeperWatcher watcher, + String groupBasePath) throws IOException { + List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList(); + //Overwrite any info stored by table, this takes precedence + try { + if(ZKUtil.checkExists(watcher, groupBasePath) != -1) { + for(String znode: ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath)) { + byte[] data = ZKUtil.getData(watcher, ZKUtil.joinZNode(groupBasePath, znode)); + if(data.length > 0) { + ProtobufUtil.expectPBMagicPrefix(data); + ByteArrayInputStream bis = new ByteArrayInputStream( + data, ProtobufUtil.lengthOfPBMagic(), data.length); + RSGroupInfoList.add(ProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis))); + } + } + LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size()); + } + } catch (KeeperException e) { + throw new IOException("Failed to read rsGroupZNode",e); + } catch (DeserializationException e) { + throw new IOException("Failed to read rsGroupZNode",e); + } catch (InterruptedException e) { + throw new IOException("Failed to read rsGroupZNode",e); + } + return RSGroupInfoList; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a860e48a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java new file mode 100644 index 0000000..ec86dda --- /dev/null +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java @@ -0,0 +1,29 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rsgroup; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.master.LoadBalancer; + + +@InterfaceAudience.Private +public interface RSGroupableBalancer extends LoadBalancer { +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a860e48a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java new file mode 100644 index 0000000..1539f73 --- /dev/null +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java @@ -0,0 +1,574 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; +import com.google.common.net.HostAndPort; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; +import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; +import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +//TODO use stochastic based load balancer instead +@Category(SmallTests.class) +public class TestRSGroupBasedLoadBalancer { + + private static final Log LOG = LogFactory.getLog(TestRSGroupBasedLoadBalancer.class); + private static RSGroupBasedLoadBalancer loadBalancer; + private static SecureRandom rand; + + static String[] groups = new String[] { RSGroupInfo.DEFAULT_GROUP, "dg2", "dg3", + "dg4" }; + static TableName[] tables = + new TableName[] { TableName.valueOf("dt1"), + TableName.valueOf("dt2"), + TableName.valueOf("dt3"), + TableName.valueOf("dt4")}; + static List<ServerName> servers; + static Map<String, RSGroupInfo> groupMap; + static Map<TableName, String> tableMap; + static List<HTableDescriptor> tableDescs; + int[] regionAssignment = new int[] { 2, 5, 7, 10, 4, 3, 1 }; + static int regionId = 0; + + @BeforeClass + public static void beforeAllTests() throws Exception { + rand = new SecureRandom(); + servers = generateServers(7); + groupMap = constructGroupInfo(servers, groups); + tableMap = new HashMap<TableName, String>(); + tableDescs = constructTableDesc(); + Configuration conf = HBaseConfiguration.create(); + conf.set("hbase.regions.slop", "0"); + conf.set("hbase.group.grouploadbalancer.class", SimpleLoadBalancer.class.getCanonicalName()); + loadBalancer = new RSGroupBasedLoadBalancer(getMockedGroupInfoManager()); + loadBalancer.setMasterServices(getMockedMaster()); + loadBalancer.setConf(conf); + loadBalancer.initialize(); + } + + /** + * Test the load balancing algorithm. + * + * Invariant is that all servers of the group should be hosting either floor(average) or + * ceiling(average) + * + * @throws Exception + */ + @Test + public void testBalanceCluster() throws Exception { + Map<ServerName, List<HRegionInfo>> servers = mockClusterServers(); + ArrayListMultimap<String, ServerAndLoad> list = convertToGroupBasedMap(servers); + LOG.info("Mock Cluster : " + printStats(list)); + List<RegionPlan> plans = loadBalancer.balanceCluster(servers); + ArrayListMultimap<String, ServerAndLoad> balancedCluster = reconcile( + list, plans); + LOG.info("Mock Balance : " + printStats(balancedCluster)); + assertClusterAsBalanced(balancedCluster); + } + + /** + * Invariant is that all servers of a group have load between floor(avg) and + * ceiling(avg) number of regions. + */ + private void assertClusterAsBalanced( + ArrayListMultimap<String, ServerAndLoad> groupLoadMap) { + for (String gName : groupLoadMap.keySet()) { + List<ServerAndLoad> groupLoad = groupLoadMap.get(gName); + int numServers = groupLoad.size(); + int numRegions = 0; + int maxRegions = 0; + int minRegions = Integer.MAX_VALUE; + for (ServerAndLoad server : groupLoad) { + int nr = server.getLoad(); + if (nr > maxRegions) { + maxRegions = nr; + } + if (nr < minRegions) { + minRegions = nr; + } + numRegions += nr; + } + if (maxRegions - minRegions < 2) { + // less than 2 between max and min, can't balance + return; + } + int min = numRegions / numServers; + int max = numRegions % numServers == 0 ? min : min + 1; + + for (ServerAndLoad server : groupLoad) { + assertTrue(server.getLoad() <= max); + assertTrue(server.getLoad() >= min); + } + } + } + + /** + * All regions have an assignment. + * + * @param regions + * @param servers + * @param assignments + * @throws java.io.IOException + * @throws java.io.FileNotFoundException + */ + private void assertImmediateAssignment(List<HRegionInfo> regions, + List<ServerName> servers, + Map<HRegionInfo, ServerName> assignments) + throws IOException { + for (HRegionInfo region : regions) { + assertTrue(assignments.containsKey(region)); + ServerName server = assignments.get(region); + TableName tableName = region.getTable(); + + String groupName = + getMockedGroupInfoManager().getRSGroupOfTable(tableName); + assertTrue(StringUtils.isNotEmpty(groupName)); + RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(groupName); + assertTrue("Region is not correctly assigned to group servers.", + gInfo.containsServer(server.getHostPort())); + } + } + + /** + * Tests the bulk assignment used during cluster startup. + * + * Round-robin. Should yield a balanced cluster so same invariant as the + * load balancer holds, all servers holding either floor(avg) or + * ceiling(avg). + * + * @throws Exception + */ + @Test + public void testBulkAssignment() throws Exception { + List<HRegionInfo> regions = randomRegions(25); + Map<ServerName, List<HRegionInfo>> assignments = loadBalancer + .roundRobinAssignment(regions, servers); + //test empty region/servers scenario + //this should not throw an NPE + loadBalancer.roundRobinAssignment(regions, + Collections.EMPTY_LIST); + //test regular scenario + assertTrue(assignments.keySet().size() == servers.size()); + for (ServerName sn : assignments.keySet()) { + List<HRegionInfo> regionAssigned = assignments.get(sn); + for (HRegionInfo region : regionAssigned) { + TableName tableName = region.getTable(); + String groupName = + getMockedGroupInfoManager().getRSGroupOfTable(tableName); + assertTrue(StringUtils.isNotEmpty(groupName)); + RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup( + groupName); + assertTrue( + "Region is not correctly assigned to group servers.", + gInfo.containsServer(sn.getHostPort())); + } + } + ArrayListMultimap<String, ServerAndLoad> loadMap = convertToGroupBasedMap(assignments); + assertClusterAsBalanced(loadMap); + } + + /** + * Test the cluster startup bulk assignment which attempts to retain + * assignment info. + * + * @throws Exception + */ + @Test + public void testRetainAssignment() throws Exception { + // Test simple case where all same servers are there + Map<ServerName, List<HRegionInfo>> currentAssignments = mockClusterServers(); + Map<HRegionInfo, ServerName> inputForTest = new HashMap<HRegionInfo, ServerName>(); + for (ServerName sn : currentAssignments.keySet()) { + for (HRegionInfo region : currentAssignments.get(sn)) { + inputForTest.put(region, sn); + } + } + //verify region->null server assignment is handled + inputForTest.put(randomRegions(1).get(0), null); + Map<ServerName, List<HRegionInfo>> newAssignment = loadBalancer + .retainAssignment(inputForTest, servers); + assertRetainedAssignment(inputForTest, servers, newAssignment); + } + + /** + * Asserts a valid retained assignment plan. + * <p> + * Must meet the following conditions: + * <ul> + * <li>Every input region has an assignment, and to an online server + * <li>If a region had an existing assignment to a server with the same + * address a a currently online server, it will be assigned to it + * </ul> + * + * @param existing + * @param assignment + * @throws java.io.IOException + * @throws java.io.FileNotFoundException + */ + private void assertRetainedAssignment( + Map<HRegionInfo, ServerName> existing, List<ServerName> servers, + Map<ServerName, List<HRegionInfo>> assignment) + throws FileNotFoundException, IOException { + // Verify condition 1, every region assigned, and to online server + Set<ServerName> onlineServerSet = new TreeSet<ServerName>(servers); + Set<HRegionInfo> assignedRegions = new TreeSet<HRegionInfo>(); + for (Map.Entry<ServerName, List<HRegionInfo>> a : assignment.entrySet()) { + assertTrue( + "Region assigned to server that was not listed as online", + onlineServerSet.contains(a.getKey())); + for (HRegionInfo r : a.getValue()) + assignedRegions.add(r); + } + assertEquals(existing.size(), assignedRegions.size()); + + // Verify condition 2, every region must be assigned to correct server. + Set<String> onlineHostNames = new TreeSet<String>(); + for (ServerName s : servers) { + onlineHostNames.add(s.getHostname()); + } + + for (Map.Entry<ServerName, List<HRegionInfo>> a : assignment.entrySet()) { + ServerName currentServer = a.getKey(); + for (HRegionInfo r : a.getValue()) { + ServerName oldAssignedServer = existing.get(r); + TableName tableName = r.getTable(); + String groupName = + getMockedGroupInfoManager().getRSGroupOfTable(tableName); + assertTrue(StringUtils.isNotEmpty(groupName)); + RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup( + groupName); + assertTrue( + "Region is not correctly assigned to group servers.", + gInfo.containsServer(currentServer.getHostPort())); + if (oldAssignedServer != null + && onlineHostNames.contains(oldAssignedServer + .getHostname())) { + // this region was previously assigned somewhere, and that + // host is still around, then the host must have been is a + // different group. + if (!oldAssignedServer.getHostPort().equals(currentServer.getHostPort())) { + assertFalse(gInfo.containsServer(oldAssignedServer.getHostPort())); + } + } + } + } + } + + private String printStats( + ArrayListMultimap<String, ServerAndLoad> groupBasedLoad) { + StringBuffer sb = new StringBuffer(); + sb.append("\n"); + for (String groupName : groupBasedLoad.keySet()) { + sb.append("Stats for group: " + groupName); + sb.append("\n"); + sb.append(groupMap.get(groupName).getServers()); + sb.append("\n"); + List<ServerAndLoad> groupLoad = groupBasedLoad.get(groupName); + int numServers = groupLoad.size(); + int totalRegions = 0; + sb.append("Per Server Load: \n"); + for (ServerAndLoad sLoad : groupLoad) { + sb.append("Server :" + sLoad.getServerName() + " Load : " + + sLoad.getLoad() + "\n"); + totalRegions += sLoad.getLoad(); + } + sb.append(" Group Statistics : \n"); + float average = (float) totalRegions / numServers; + int max = (int) Math.ceil(average); + int min = (int) Math.floor(average); + sb.append("[srvr=" + numServers + " rgns=" + totalRegions + " avg=" + + average + " max=" + max + " min=" + min + "]"); + sb.append("\n"); + sb.append("==============================="); + sb.append("\n"); + } + return sb.toString(); + } + + private ArrayListMultimap<String, ServerAndLoad> convertToGroupBasedMap( + final Map<ServerName, List<HRegionInfo>> serversMap) throws IOException { + ArrayListMultimap<String, ServerAndLoad> loadMap = ArrayListMultimap + .create(); + for (RSGroupInfo gInfo : getMockedGroupInfoManager().listRSGroups()) { + Set<HostAndPort> groupServers = gInfo.getServers(); + for (HostAndPort hostPort : groupServers) { + ServerName actual = null; + for(ServerName entry: servers) { + if(entry.getHostPort().equals(hostPort)) { + actual = entry; + break; + } + } + List<HRegionInfo> regions = serversMap.get(actual); + assertTrue("No load for " + actual, regions != null); + loadMap.put(gInfo.getName(), + new ServerAndLoad(actual, regions.size())); + } + } + return loadMap; + } + + private ArrayListMultimap<String, ServerAndLoad> reconcile( + ArrayListMultimap<String, ServerAndLoad> previousLoad, + List<RegionPlan> plans) { + ArrayListMultimap<String, ServerAndLoad> result = ArrayListMultimap + .create(); + result.putAll(previousLoad); + if (plans != null) { + for (RegionPlan plan : plans) { + ServerName source = plan.getSource(); + updateLoad(result, source, -1); + ServerName destination = plan.getDestination(); + updateLoad(result, destination, +1); + } + } + return result; + } + + private void updateLoad( + ArrayListMultimap<String, ServerAndLoad> previousLoad, + final ServerName sn, final int diff) { + for (String groupName : previousLoad.keySet()) { + ServerAndLoad newSAL = null; + ServerAndLoad oldSAL = null; + for (ServerAndLoad sal : previousLoad.get(groupName)) { + if (ServerName.isSameHostnameAndPort(sn, sal.getServerName())) { + oldSAL = sal; + newSAL = new ServerAndLoad(sn, sal.getLoad() + diff); + break; + } + } + if (newSAL != null) { + previousLoad.remove(groupName, oldSAL); + previousLoad.put(groupName, newSAL); + break; + } + } + } + + private Map<ServerName, List<HRegionInfo>> mockClusterServers() throws IOException { + assertTrue(servers.size() == regionAssignment.length); + Map<ServerName, List<HRegionInfo>> assignment = new TreeMap<ServerName, List<HRegionInfo>>(); + for (int i = 0; i < servers.size(); i++) { + int numRegions = regionAssignment[i]; + List<HRegionInfo> regions = assignedRegions(numRegions, servers.get(i)); + assignment.put(servers.get(i), regions); + } + return assignment; + } + + /** + * Generate a list of regions evenly distributed between the tables. + * + * @param numRegions The number of regions to be generated. + * @return List of HRegionInfo. + */ + private List<HRegionInfo> randomRegions(int numRegions) { + List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions); + byte[] start = new byte[16]; + byte[] end = new byte[16]; + rand.nextBytes(start); + rand.nextBytes(end); + int regionIdx = rand.nextInt(tables.length); + for (int i = 0; i < numRegions; i++) { + Bytes.putInt(start, 0, numRegions << 1); + Bytes.putInt(end, 0, (numRegions << 1) + 1); + int tableIndex = (i + regionIdx) % tables.length; + HRegionInfo hri = new HRegionInfo( + tables[tableIndex], start, end, false, regionId++); + regions.add(hri); + } + return regions; + } + + /** + * Generate assigned regions to a given server using group information. + * + * @param numRegions the num regions to generate + * @param sn the servername + * @return the list of regions + * @throws java.io.IOException Signals that an I/O exception has occurred. + */ + private List<HRegionInfo> assignedRegions(int numRegions, ServerName sn) throws IOException { + List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions); + byte[] start = new byte[16]; + byte[] end = new byte[16]; + Bytes.putInt(start, 0, numRegions << 1); + Bytes.putInt(end, 0, (numRegions << 1) + 1); + for (int i = 0; i < numRegions; i++) { + TableName tableName = getTableName(sn); + HRegionInfo hri = new HRegionInfo( + tableName, start, end, false, + regionId++); + regions.add(hri); + } + return regions; + } + + private static List<ServerName> generateServers(int numServers) { + List<ServerName> servers = new ArrayList<ServerName>(numServers); + for (int i = 0; i < numServers; i++) { + String host = "server" + rand.nextInt(100000); + int port = rand.nextInt(60000); + servers.add(ServerName.valueOf(host, port, -1)); + } + return servers; + } + + /** + * Construct group info, with each group having at least one server. + * + * @param servers the servers + * @param groups the groups + * @return the map + */ + private static Map<String, RSGroupInfo> constructGroupInfo( + List<ServerName> servers, String[] groups) { + assertTrue(servers != null); + assertTrue(servers.size() >= groups.length); + int index = 0; + Map<String, RSGroupInfo> groupMap = new HashMap<String, RSGroupInfo>(); + for (String grpName : groups) { + RSGroupInfo RSGroupInfo = new RSGroupInfo(grpName); + RSGroupInfo.addServer(servers.get(index).getHostPort()); + groupMap.put(grpName, RSGroupInfo); + index++; + } + while (index < servers.size()) { + int grpIndex = rand.nextInt(groups.length); + groupMap.get(groups[grpIndex]).addServer( + servers.get(index).getHostPort()); + index++; + } + return groupMap; + } + + /** + * Construct table descriptors evenly distributed between the groups. + * + * @return the list + */ + private static List<HTableDescriptor> constructTableDesc() { + List<HTableDescriptor> tds = Lists.newArrayList(); + int index = rand.nextInt(groups.length); + for (int i = 0; i < tables.length; i++) { + HTableDescriptor htd = new HTableDescriptor(tables[i]); + int grpIndex = (i + index) % groups.length ; + String groupName = groups[grpIndex]; + tableMap.put(tables[i], groupName); + tds.add(htd); + } + return tds; + } + + private static MasterServices getMockedMaster() throws IOException { + TableDescriptors tds = Mockito.mock(TableDescriptors.class); + Mockito.when(tds.get(tables[0])).thenReturn(tableDescs.get(0)); + Mockito.when(tds.get(tables[1])).thenReturn(tableDescs.get(1)); + Mockito.when(tds.get(tables[2])).thenReturn(tableDescs.get(2)); + Mockito.when(tds.get(tables[3])).thenReturn(tableDescs.get(3)); + MasterServices services = Mockito.mock(HMaster.class); + Mockito.when(services.getTableDescriptors()).thenReturn(tds); + AssignmentManager am = Mockito.mock(AssignmentManager.class); + Mockito.when(services.getAssignmentManager()).thenReturn(am); + return services; + } + + private static RSGroupInfoManager getMockedGroupInfoManager() throws IOException { + RSGroupInfoManager gm = Mockito.mock(RSGroupInfoManager.class); + Mockito.when(gm.getRSGroup(groups[0])).thenReturn( + groupMap.get(groups[0])); + Mockito.when(gm.getRSGroup(groups[1])).thenReturn( + groupMap.get(groups[1])); + Mockito.when(gm.getRSGroup(groups[2])).thenReturn( + groupMap.get(groups[2])); + Mockito.when(gm.getRSGroup(groups[3])).thenReturn( + groupMap.get(groups[3])); + Mockito.when(gm.listRSGroups()).thenReturn( + Lists.newLinkedList(groupMap.values())); + Mockito.when(gm.isOnline()).thenReturn(true); + Mockito.when(gm.getRSGroupOfTable(Mockito.any(TableName.class))) + .thenAnswer(new Answer<String>() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + return tableMap.get(invocation.getArguments()[0]); + } + }); + return gm; + } + + private TableName getTableName(ServerName sn) throws IOException { + TableName tableName = null; + RSGroupInfoManager gm = getMockedGroupInfoManager(); + RSGroupInfo groupOfServer = null; + for(RSGroupInfo gInfo : gm.listRSGroups()){ + if(gInfo.containsServer(sn.getHostPort())){ + groupOfServer = gInfo; + break; + } + } + + for(HTableDescriptor desc : tableDescs){ + if(gm.getRSGroupOfTable(desc.getTableName()).endsWith(groupOfServer.getName())){ + tableName = desc.getTableName(); + } + } + return tableName; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a860e48a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java new file mode 100644 index 0000000..34add63 --- /dev/null +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java @@ -0,0 +1,287 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.rsgroup; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import com.google.common.net.HostAndPort; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.Waiter.Predicate; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MediumTests.class}) +public class TestRSGroups extends TestRSGroupsBase { + protected static final Log LOG = LogFactory.getLog(TestRSGroups.class); + private static HMaster master; + private static boolean init = false; + private static RSGroupAdminEndpoint RSGroupAdminEndpoint; + + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.getConfiguration().set( + HConstants.HBASE_MASTER_LOADBALANCER_CLASS, + RSGroupBasedLoadBalancer.class.getName()); + TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + RSGroupAdminEndpoint.class.getName()); + TEST_UTIL.getConfiguration().setBoolean( + HConstants.ZOOKEEPER_USEMULTI, + true); + TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE); + TEST_UTIL.getConfiguration().set( + ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, + ""+NUM_SLAVES_BASE); + + admin = TEST_UTIL.getHBaseAdmin(); + cluster = TEST_UTIL.getHBaseCluster(); + master = ((MiniHBaseCluster)cluster).getMaster(); + + //wait for balancer to come online + TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + return master.isInitialized() && + ((RSGroupBasedLoadBalancer) master.getLoadBalancer()).isOnline(); + } + }); + admin.setBalancerRunning(false,true); + rsGroupAdmin = new VerifyingRSGroupAdminClient(rsGroupAdmin.newClient(TEST_UTIL.getConnection()), + TEST_UTIL.getConfiguration()); + RSGroupAdminEndpoint = + master.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class).get(0); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void beforeMethod() throws Exception { + if(!init) { + init = true; + afterMethod(); + } + + } + + @After + public void afterMethod() throws Exception { + deleteTableIfNecessary(); + deleteNamespaceIfNecessary(); + deleteGroups(); + + int missing = NUM_SLAVES_BASE - getNumServers(); + LOG.info("Restoring servers: "+missing); + for(int i=0; i<missing; i++) { + ((MiniHBaseCluster)cluster).startRegionServer(); + } + + rsGroupAdmin.addRSGroup("master"); + ServerName masterServerName = + ((MiniHBaseCluster)cluster).getMaster().getServerName(); + + try { + rsGroupAdmin.moveServers( + Sets.newHashSet(masterServerName.getHostPort()), + "master"); + } catch (Exception ex) { + // ignore + } + TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + LOG.info("Waiting for cleanup to finish " + rsGroupAdmin.listRSGroups()); + //Might be greater since moving servers back to default + //is after starting a server + + return rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP).getServers().size() + == NUM_SLAVES_BASE; + } + }); + } + + @Test + public void testBasicStartUp() throws IOException { + RSGroupInfo defaultInfo = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP); + assertEquals(4, defaultInfo.getServers().size()); + // Assignment of root and meta regions. + int count = master.getAssignmentManager().getRegionStates().getRegionAssignments().size(); + //3 meta,namespace, group + assertEquals(3, count); + } + + @Test + public void testNamespaceCreateAndAssign() throws Exception { + LOG.info("testNamespaceCreateAndAssign"); + String nsName = tablePrefix+"_foo"; + final TableName tableName = TableName.valueOf(nsName, tablePrefix + "_testCreateAndAssign"); + RSGroupInfo appInfo = addGroup(rsGroupAdmin, "appInfo", 1); + admin.createNamespace(NamespaceDescriptor.create(nsName) + .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, "appInfo").build()); + final HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor("f")); + admin.createTable(desc); + //wait for created table to be assigned + TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + return getTableRegionMap().get(desc.getTableName()) != null; + } + }); + ServerName targetServer = + ServerName.parseServerName(appInfo.getServers().iterator().next().toString()); + AdminProtos.AdminService.BlockingInterface rs = admin.getConnection().getAdmin(targetServer); + //verify it was assigned to the right group + Assert.assertEquals(1, ProtobufUtil.getOnlineRegions(rs).size()); + } + + @Test + public void testDefaultNamespaceCreateAndAssign() throws Exception { + LOG.info("testDefaultNamespaceCreateAndAssign"); + final byte[] tableName = Bytes.toBytes(tablePrefix + "_testCreateAndAssign"); + admin.modifyNamespace(NamespaceDescriptor.create("default") + .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, "default").build()); + final HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor("f")); + admin.createTable(desc); + //wait for created table to be assigned + TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + return getTableRegionMap().get(desc.getTableName()) != null; + } + }); + } + + @Test + public void testNamespaceConstraint() throws Exception { + String nsName = tablePrefix+"_foo"; + String groupName = tablePrefix+"_foo"; + LOG.info("testNamespaceConstraint"); + rsGroupAdmin.addRSGroup(groupName); + admin.createNamespace(NamespaceDescriptor.create(nsName) + .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, groupName) + .build()); + //test removing a referenced group + try { + rsGroupAdmin.removeRSGroup(groupName); + fail("Expected a constraint exception"); + } catch (IOException ex) { + } + //test modify group + //changing with the same name is fine + admin.modifyNamespace( + NamespaceDescriptor.create(nsName) + .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, groupName) + .build()); + String anotherGroup = tablePrefix+"_anotherGroup"; + rsGroupAdmin.addRSGroup(anotherGroup); + //test add non-existent group + admin.deleteNamespace(nsName); + rsGroupAdmin.removeRSGroup(groupName); + try { + admin.createNamespace(NamespaceDescriptor.create(nsName) + .addConfiguration(RSGroupInfo.NAMESPACEDESC_PROP_GROUP, "foo") + .build()); + fail("Expected a constraint exception"); + } catch (IOException ex) { + } + } + + @Test + public void testGroupInfoMultiAccessing() throws Exception { + RSGroupInfoManager manager = RSGroupAdminEndpoint.getGroupInfoManager(); + final RSGroupInfo defaultGroup = manager.getRSGroup("default"); + // getRSGroup updates default group's server list + // this process must not affect other threads iterating the list + Iterator<HostAndPort> it = defaultGroup.getServers().iterator(); + manager.getRSGroup("default"); + it.next(); + } + + @Test + public void testMisplacedRegions() throws Exception { + final TableName tableName = TableName.valueOf(tablePrefix+"_testMisplacedRegions"); + LOG.info("testMisplacedRegions"); + + final RSGroupInfo RSGroupInfo = addGroup(rsGroupAdmin, "testMisplacedRegions", 1); + + TEST_UTIL.createMultiRegionTable(tableName, new byte[]{'f'}, 15); + TEST_UTIL.waitUntilAllRegionsAssigned(tableName); + + RSGroupAdminEndpoint.getGroupInfoManager() + .moveTables(Sets.newHashSet(tableName), RSGroupInfo.getName()); + + assertTrue(rsGroupAdmin.balanceRSGroup(RSGroupInfo.getName())); + + TEST_UTIL.waitFor(60000, new Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + ServerName serverName = + ServerName.valueOf(RSGroupInfo.getServers().iterator().next().toString(), 1); + return admin.getConnection().getAdmin() + .getOnlineRegions(serverName).size() == 15; + } + }); + } +}