Merge branch '1.8'
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/80762e9f Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/80762e9f Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/80762e9f Branch: refs/heads/master Commit: 80762e9fcb25c1772510119d466c3352cec32f82 Parents: 32dbd42 6c20e50 Author: Ivan Bella <i...@bella.name> Authored: Wed Jul 19 18:51:17 2017 -0400 Committer: Ivan Bella <i...@bella.name> Committed: Wed Jul 19 18:51:17 2017 -0400 ---------------------------------------------------------------------- .../balancer/HostRegexTableLoadBalancer.java | 117 +++++++++++++++++-- .../BaseHostRegexTableLoadBalancerTest.java | 65 ++++++++++- ...gexTableLoadBalancerReconfigurationTest.java | 2 +- .../HostRegexTableLoadBalancerTest.java | 61 ++++++++-- 4 files changed, 224 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/80762e9f/server/base/src/main/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancer.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancer.java index 945efd0,dbf03d0..c3d15c8 --- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancer.java @@@ -31,16 -32,19 +32,20 @@@ import java.util.SortedMap import java.util.TreeMap; import java.util.regex.Pattern; + import com.google.common.collect.HashMultimap; + import com.google.common.collect.Iterables; + import com.google.common.collect.Multimap; import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.client.impl.Table; import org.apache.accumulo.core.conf.ConfigurationObserver; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.impl.KeyExtent; + import org.apache.accumulo.core.master.thrift.TableInfo; import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.core.tabletserver.thrift.TabletStats; +import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.conf.ServerConfiguration; -import org.apache.accumulo.server.conf.ServerConfigurationFactory; import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.master.state.TabletMigration; import org.apache.commons.lang.builder.ToStringBuilder; @@@ -77,15 -88,24 +84,24 @@@ public class HostRegexTableLoadBalance + "balancer.host.regex.concurrent.migrations"; private static final int HOST_BALANCER_REGEX_MAX_MIGRATIONS_DEFAULT = 250; protected static final String DEFAULT_POOL = "HostTableLoadBalancer.ALL"; + private static final int DEFAULT_OUTSTANDING_MIGRATIONS = 0; + public static final String HOST_BALANCER_OUTSTANDING_MIGRATIONS_KEY = Property.TABLE_ARBITRARY_PROP_PREFIX.getKey() + + "balancer.host.regex.max.outstanding.migrations"; - protected long oobCheckMillis = AccumuloConfiguration.getTimeInMillis(HOST_BALANCER_OOB_DEFAULT); + protected long oobCheckMillis = ConfigurationTypeHelper.getTimeInMillis(HOST_BALANCER_OOB_DEFAULT); + private static final long ONE_HOUR = 60 * 60 * 1000; - private static final Set<KeyExtent> EMPTY_MIGRATIONS = Collections.EMPTY_SET; ++ private static final Set<KeyExtent> EMPTY_MIGRATIONS = Collections.emptySet(); + - private Map<String,String> tableIdToTableName = null; + private Map<Table.ID,String> tableIdToTableName = null; private Map<String,Pattern> poolNameToRegexPattern = null; private volatile long lastOOBCheck = System.currentTimeMillis(); - private boolean isIpBasedRegex = false; + private volatile boolean isIpBasedRegex = false; private Map<String,SortedMap<TServerInstance,TabletServerStatus>> pools = new HashMap<>(); - private int maxTServerMigrations = HOST_BALANCER_REGEX_MAX_MIGRATIONS_DEFAULT; + private volatile int maxTServerMigrations = HOST_BALANCER_REGEX_MAX_MIGRATIONS_DEFAULT; + private volatile int maxOutstandingMigrations = DEFAULT_OUTSTANDING_MIGRATIONS; - private final Map<KeyExtent,TabletMigration> migrationsFromLastPass = new HashMap<KeyExtent,TabletMigration>(); - private final Map<String,Long> tableToTimeSinceNoMigrations = new HashMap<String,Long>(); ++ private final Map<KeyExtent,TabletMigration> migrationsFromLastPass = new HashMap<>(); ++ private final Map<String,Long> tableToTimeSinceNoMigrations = new HashMap<>(); /** * Group the set of current tservers by pool name. Tservers that don't match a regex are put into a default pool. This could be expensive in the terms of the @@@ -210,6 -229,10 +226,10 @@@ if (null != migrations) { maxTServerMigrations = Integer.parseInt(migrations); } - String outstanding = conf.getConfiguration().get(HOST_BALANCER_OUTSTANDING_MIGRATIONS_KEY); ++ String outstanding = conf.getSystemConfiguration().get(HOST_BALANCER_OUTSTANDING_MIGRATIONS_KEY); + if (null != outstanding) { + this.maxOutstandingMigrations = Integer.parseInt(outstanding); + } LOG.info("{}", this); } @@@ -359,8 -398,34 +390,34 @@@ } if (migrations != null && migrations.size() > 0) { - LOG.warn("Not balancing tables due to {} outstanding migrations", migrations.size()); - return minBalanceTime; + if (migrations.size() >= maxOutstandingMigrations) { + LOG.warn("Not balancing tables due to {} outstanding migrations", migrations.size()); + if (LOG.isTraceEnabled()) { + LOG.trace("Sample up to 10 outstanding migrations: {}", Iterables.limit(migrations, 10)); + } + return minBalanceTime; + } + + LOG.debug("Current outstanding migrations of {} being applied", migrations.size()); + if (LOG.isTraceEnabled()) { + LOG.trace("Sample up to 10 outstanding migrations: {}", Iterables.limit(migrations, 10)); + } + migrationsFromLastPass.keySet().retainAll(migrations); - SortedMap<TServerInstance,TabletServerStatus> currentCopy = new TreeMap(current); ++ SortedMap<TServerInstance,TabletServerStatus> currentCopy = new TreeMap<>(current); + Multimap<TServerInstance,String> serverTableIdCopied = HashMultimap.create(); + for (TabletMigration migration : migrationsFromLastPass.values()) { + TableInfo fromInfo = getTableInfo(currentCopy, serverTableIdCopied, migration.tablet.getTableId().toString(), migration.oldServer); + if (fromInfo != null) { + fromInfo.setOnlineTablets(fromInfo.getOnlineTablets() - 1); + } + TableInfo toInfo = getTableInfo(currentCopy, serverTableIdCopied, migration.tablet.getTableId().toString(), migration.newServer); + if (toInfo != null) { + toInfo.setOnlineTablets(toInfo.getOnlineTablets() + 1); + } + } + migrations = EMPTY_MIGRATIONS; + } else { + migrationsFromLastPass.clear(); } for (String s : tableIdMap.values()) { @@@ -373,10 -437,20 +430,20 @@@ continue; } ArrayList<TabletMigration> newMigrations = new ArrayList<>(); - getBalancerForTable(s).balance(currentView, migrations, newMigrations); + getBalancerForTable(tableId).balance(currentView, migrations, newMigrations); + if (newMigrations.isEmpty()) { + tableToTimeSinceNoMigrations.remove(s); + } else if (tableToTimeSinceNoMigrations.containsKey(s)) { + if ((now - tableToTimeSinceNoMigrations.get(s)) > ONE_HOUR) { + LOG.warn("We have been consistently producing migrations for {}: {}", tableName, Iterables.limit(newMigrations, 10)); + } + } else { + tableToTimeSinceNoMigrations.put(s, now); + } + migrationsOut.addAll(newMigrations); - if (migrationsOut.size() > this.maxTServerMigrations) { + if (migrationsOut.size() >= this.maxTServerMigrations) { break; } } @@@ -384,9 -463,36 +456,36 @@@ return minBalanceTime; } + /** + * Get a mutable table info for the specified table and server + */ + private TableInfo getTableInfo(SortedMap<TServerInstance,TabletServerStatus> currentCopy, Multimap<TServerInstance,String> serverTableIdCopied, + String tableId, TServerInstance server) { + TableInfo newInfo = null; + if (currentCopy.containsKey(server)) { + Map<String,TableInfo> newTableMap = currentCopy.get(server).getTableMap(); + if (newTableMap != null) { + newInfo = newTableMap.get(tableId); + if (newInfo != null) { + Collection<String> tableIdCopied = serverTableIdCopied.get(server); + if (tableIdCopied.isEmpty()) { + newTableMap = new HashMap<String,TableInfo>(newTableMap); + currentCopy.get(server).setTableMap(newTableMap); + } + if (!tableIdCopied.contains(tableId)) { + newInfo = new TableInfo(newInfo); + newTableMap.put(tableId, newInfo); + tableIdCopied.add(tableId); + } + } + } + } + return newInfo; + } + @Override public void propertyChanged(String key) { - parseConfiguration(this.configuration); + parseConfiguration(context.getServerConfigurationFactory()); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/80762e9f/server/base/src/test/java/org/apache/accumulo/server/master/balancer/BaseHostRegexTableLoadBalancerTest.java ---------------------------------------------------------------------- diff --cc server/base/src/test/java/org/apache/accumulo/server/master/balancer/BaseHostRegexTableLoadBalancerTest.java index b44385b,e44491d..83d6312 --- a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/BaseHostRegexTableLoadBalancerTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/BaseHostRegexTableLoadBalancerTest.java @@@ -34,23 -34,25 +34,27 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.Namespace; +import org.apache.accumulo.core.client.impl.Table; import org.apache.accumulo.core.client.impl.TableOperationsImpl; + import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.impl.KeyExtent; + import org.apache.accumulo.core.master.thrift.TableInfo; import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.server.conf.NamespaceConfiguration; + import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.server.conf.ServerConfigurationFactory; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.hadoop.io.Text; + import org.apache.thrift.TException; import org.easymock.EasyMock; -import com.google.common.base.Predicate; - public abstract class BaseHostRegexTableLoadBalancerTest extends HostRegexTableLoadBalancer { protected static class TestInstance implements Instance { @@@ -150,10 -162,8 +157,10 @@@ } @Override - public TableConfiguration getTableConfiguration(Table.ID tableId) { - public TableConfiguration getTableConfiguration(final String tableId) { - return new TableConfiguration(getInstance(), tableId, null) { ++ public TableConfiguration getTableConfiguration(final Table.ID tableId) { + // create a dummy namespaceConfiguration to satisfy requireNonNull in TableConfiguration constructor + NamespaceConfiguration dummyConf = new NamespaceConfiguration(Namespace.ID.DEFAULT, this.instance, DefaultConfiguration.getInstance()); + return new TableConfiguration(this.instance, tableId, dummyConf) { @Override public String get(Property property) { return DEFAULT_TABLE_PROPERTIES.get(property.name()); @@@ -171,10 -181,28 +178,28 @@@ } } + protected static final TestTable FOO = new TestTable("foo", new Table.ID("1")); + protected static final TestTable BAR = new TestTable("bar", new Table.ID("2")); + protected static final TestTable BAZ = new TestTable("baz", new Table.ID("3")); + + protected class TestDefaultBalancer extends DefaultLoadBalancer { + @Override - public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, String tableId) throws ThriftSecurityException, TException { ++ public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, Table.ID tableId) throws ThriftSecurityException, TException { + String tableName = idToTableName(tableId); + TServerInstance initialLocation = initialTableLocation.get(tableName); + if (tserver.equals(initialLocation)) { + List<TabletStats> list = new ArrayList<TabletStats>(5); + for (KeyExtent extent : tableExtents.get(tableName)) { + TabletStats stats = new TabletStats(); + stats.setExtent(extent.toThrift()); + list.add(stats); + } + return list; + } + return null; + } + } + - protected static final Table FOO = new Table("foo", "1"); - protected static final Table BAR = new Table("bar", "2"); - protected static final Table BAZ = new Table("baz", "3"); - protected final TestInstance instance = new TestInstance(); protected final TestServerConfigurationFactory factory = new TestServerConfigurationFactory(instance); protected final Map<String,String> servers = new HashMap<>(15); @@@ -252,6 -286,18 +283,18 @@@ } } - protected String idToTableName(String id) { ++ protected String idToTableName(Table.ID id) { + if (id.equals(FOO.getId())) { + return FOO.getTableName(); + } else if (id.equals(BAR.getId())) { + return BAR.getTableName(); + } else if (id.equals(BAZ.getId())) { + return BAZ.getTableName(); + } else { + return null; + } + } + @Override protected TableOperations getTableOperations() { return new TableOperationsImpl(EasyMock.createMock(ClientContext.class)) { @@@ -276,8 -322,8 +319,8 @@@ } @Override - protected TabletBalancer getBalancerForTable(String table) { + protected TabletBalancer getBalancerForTable(Table.ID table) { - return new DefaultLoadBalancer(); + return new TestDefaultBalancer(); } @Override @@@ -293,7 -339,21 +336,21 @@@ String base = "192.168.0."; TreeMap<TServerInstance,TabletServerStatus> current = new TreeMap<>(); for (int i = 1; i <= numTservers; i++) { - current.put(new TServerInstance(base + i + ":9997", 1), new TabletServerStatus()); + TabletServerStatus status = new TabletServerStatus(); + Map<String,TableInfo> tableMap = new HashMap<String,TableInfo>(); - tableMap.put(FOO.getId(), new TableInfo()); - tableMap.put(BAR.getId(), new TableInfo()); - tableMap.put(BAZ.getId(), new TableInfo()); ++ tableMap.put(FOO.getId().canonicalID(), new TableInfo()); ++ tableMap.put(BAR.getId().canonicalID(), new TableInfo()); ++ tableMap.put(BAZ.getId().canonicalID(), new TableInfo()); + status.setTableMap(tableMap); + current.put(new TServerInstance(base + i + ":9997", 1), status); + } + // now put all of the tablets on one server + for (Map.Entry<String,TServerInstance> entry : initialTableLocation.entrySet()) { + TabletServerStatus status = current.get(entry.getValue()); + if (status != null) { + String tableId = getTableOperations().tableIdMap().get(entry.getKey()); + status.getTableMap().get(tableId).setOnlineTablets(5); + } } return current; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/80762e9f/server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerReconfigurationTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/80762e9f/server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerTest.java ---------------------------------------------------------------------- diff --cc server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerTest.java index ca128e2,120aab9..941021b --- a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerTest.java @@@ -25,15 -25,11 +25,14 @@@ import java.util.Map import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; +import java.util.function.Predicate; import java.util.regex.Pattern; -import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.client.impl.Namespaces; +import org.apache.accumulo.core.client.impl.Table; - import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.data.thrift.TKeyExtent; @@@ -53,8 -50,13 +52,10 @@@ public class HostRegexTableLoadBalancer @Test public void testInit() { - init(factory); + init(new AccumuloServerContext(instance, factory)); - Assert.assertEquals("OOB check interval value is incorrect", 2000, this.getOobCheckMillis()); + Assert.assertEquals("OOB check interval value is incorrect", 7000, this.getOobCheckMillis()); - @SuppressWarnings("deprecation") - long poolRecheckMillis = this.getPoolRecheckMillis(); - Assert.assertEquals("Pool check interval value is incorrect", 0, poolRecheckMillis); + Assert.assertEquals("Max migrations is incorrect", 4, this.getMaxMigrations()); + Assert.assertEquals("Max outstanding migrations is incorrect", 10, this.getMaxOutstandingMigrations()); Assert.assertFalse(isIpBasedRegex()); Map<String,Pattern> patterns = this.getPoolNameToRegexPattern(); Assert.assertEquals(2, patterns.size()); @@@ -74,12 -76,58 +75,58 @@@ } @Test - public void testBalanceWithMigrations() { - List<TabletMigration> migrations = new ArrayList<>(); + public void testBalance() { - init((ServerConfiguration) factory); - Set<KeyExtent> migrations = new HashSet<KeyExtent>(); - List<TabletMigration> migrationsOut = new ArrayList<TabletMigration>(); + init(new AccumuloServerContext(instance, factory)); - long wait = this.balance(Collections.unmodifiableSortedMap(createCurrent(2)), Collections.singleton(new KeyExtent()), migrations); ++ Set<KeyExtent> migrations = new HashSet<>(); ++ List<TabletMigration> migrationsOut = new ArrayList<>(); + long wait = this.balance(Collections.unmodifiableSortedMap(createCurrent(15)), migrations, migrationsOut); + Assert.assertEquals(20000, wait); + // should balance four tablets in one of the tables before reaching max + Assert.assertEquals(4, migrationsOut.size()); + + // now balance again passing in the new migrations + for (TabletMigration m : migrationsOut) { + migrations.add(m.tablet); + } + migrationsOut.clear(); + wait = this.balance(Collections.unmodifiableSortedMap(createCurrent(15)), migrations, migrationsOut); + Assert.assertEquals(20000, wait); + // should balance four tablets in one of the other tables before reaching max + Assert.assertEquals(4, migrationsOut.size()); + + // now balance again passing in the new migrations + for (TabletMigration m : migrationsOut) { + migrations.add(m.tablet); + } + migrationsOut.clear(); + wait = this.balance(Collections.unmodifiableSortedMap(createCurrent(15)), migrations, migrationsOut); + Assert.assertEquals(20000, wait); + // should balance four tablets in one of the other tables before reaching max + Assert.assertEquals(4, migrationsOut.size()); + + // now balance again passing in the new migrations + for (TabletMigration m : migrationsOut) { + migrations.add(m.tablet); + } + migrationsOut.clear(); + wait = this.balance(Collections.unmodifiableSortedMap(createCurrent(15)), migrations, migrationsOut); + Assert.assertEquals(20000, wait); + // no more balancing to do + Assert.assertEquals(0, migrationsOut.size()); + } + + @Test + public void testBalanceWithTooManyOutstandingMigrations() { + List<TabletMigration> migrationsOut = new ArrayList<>(); - init(factory); ++ init(new AccumuloServerContext(instance, factory)); + // lets say we already have migrations ongoing for the FOO and BAR table extends (should be 5 of each of them) for a total of 10 - Set<KeyExtent> migrations = new HashSet<KeyExtent>(); ++ Set<KeyExtent> migrations = new HashSet<>(); + migrations.addAll(tableExtents.get(FOO.getTableName())); + migrations.addAll(tableExtents.get(BAR.getTableName())); + long wait = this.balance(Collections.unmodifiableSortedMap(createCurrent(15)), migrations, migrationsOut); Assert.assertEquals(20000, wait); - Assert.assertEquals(0, migrations.size()); + // no migrations should have occurred as 10 is the maxOutstandingMigrations + Assert.assertEquals(0, migrationsOut.size()); } @Test @@@ -361,7 -406,7 +408,7 @@@ } @Override - public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, Table.ID tableId) throws ThriftSecurityException, TException { - public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, String tableId) throws ThriftSecurityException, TException { ++ public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, Table.ID tableId) throws TException { // Report incorrect information so that balance will create an assignment List<TabletStats> tablets = new ArrayList<>(); if (tableId.equals(BAR.getId()) && tserver.host().equals("192.168.0.1")) {