Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT
Conflicts:
server/base/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
server/src/main/java/org/apache/accumulo/server/master/Master.java
server/tserver/pom.xml
test/system/auto/stress/migrations.py
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dedc9cdd
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dedc9cdd
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dedc9cdd
Branch: refs/heads/1.6.1-SNAPSHOT
Commit: dedc9cddaaa46558f542b4d9d6d357316815378f
Parents: 39f405a 6bbe121
Author: Sean Busbey <[email protected]>
Authored: Fri Aug 1 19:31:51 2014 -0500
Committer: Sean Busbey <[email protected]>
Committed: Fri Aug 1 19:31:51 2014 -0500
----------------------------------------------------------------------
.../master/balancer/ChaoticLoadBalancer.java | 14 ++-
.../master/balancer/DefaultLoadBalancer.java | 12 ++-
.../server/master/balancer/TabletBalancer.java | 91 ++++++++++++++++++++
3 files changed, 113 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/dedc9cdd/server/base/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
----------------------------------------------------------------------
diff --cc
server/base/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
index ec3371c,0000000..5767934
mode 100644,000000..100644
---
a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
@@@ -1,143 -1,0 +1,151 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.balancer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletMigration;
++import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+
+/**
+ * A chaotic load balancer used for testing. It constantly shuffles tablets,
preventing them from resting in a single location for very long. This is not
+ * designed for performance, do not use on production systems. I'm calling it
the LokiLoadBalancer.
+ */
+public class ChaoticLoadBalancer extends TabletBalancer {
++ private static final Logger log =
Logger.getLogger(ChaoticLoadBalancer.class);
+ Random r = new Random();
+
+ @Override
+ public void getAssignments(SortedMap<TServerInstance,TabletServerStatus>
current, Map<KeyExtent,TServerInstance> unassigned,
+ Map<KeyExtent,TServerInstance> assignments) {
+ long total = assignments.size() + unassigned.size();
+ long avg = (long) Math.ceil(((double) total) / current.size());
+ Map<TServerInstance,Long> toAssign = new HashMap<TServerInstance,Long>();
+ List<TServerInstance> tServerArray = new ArrayList<TServerInstance>();
+ for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
+ long numTablets = 0;
+ for (TableInfo ti : e.getValue().getTableMap().values()) {
+ numTablets += ti.tablets;
+ }
+ if (numTablets < avg) {
+ tServerArray.add(e.getKey());
+ toAssign.put(e.getKey(), avg - numTablets);
+ }
+ }
+
+ for (KeyExtent ke : unassigned.keySet()) {
+ int index = r.nextInt(tServerArray.size());
+ TServerInstance dest = tServerArray.get(index);
+ assignments.put(ke, dest);
+ long remaining = toAssign.get(dest).longValue() - 1;
+ if (remaining == 0) {
+ tServerArray.remove(index);
+ toAssign.remove(dest);
+ } else {
+ toAssign.put(dest, remaining);
+ }
+ }
+ }
+
++ protected final OutstandingMigrations outstandingMigrations = new
OutstandingMigrations(log);
++
+ /**
+ * Will balance randomly, maintaining distribution
+ */
+ @Override
+ public long balance(SortedMap<TServerInstance,TabletServerStatus> current,
Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
+ Map<TServerInstance,Long> numTablets = new
HashMap<TServerInstance,Long>();
+ List<TServerInstance> underCapacityTServer = new
ArrayList<TServerInstance>();
-
- if (!migrations.isEmpty())
++
++ if (!migrations.isEmpty()) {
++ outstandingMigrations.migrations = migrations;
++ constraintNotMet(outstandingMigrations);
+ return 100;
-
++ }
++ resetBalancerErrors();
++
+ boolean moveMetadata = r.nextInt(4) == 0;
+ long totalTablets = 0;
+ for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
+ long tabletCount = 0;
+ for (TableInfo ti : e.getValue().getTableMap().values()) {
+ tabletCount += ti.tablets;
+ }
+ numTablets.put(e.getKey(), tabletCount);
+ underCapacityTServer.add(e.getKey());
+ totalTablets += tabletCount;
+ }
+ // totalTablets is fuzzy due to asynchronicity of the stats
+ // *1.2 to handle fuzziness, and prevent locking for 'perfect' balancing
scenarios
+ long avg = (long) Math.ceil(((double) totalTablets) / current.size() *
1.2);
+
+ for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
+ for (String table : e.getValue().getTableMap().keySet()) {
+ if (!moveMetadata && MetadataTable.NAME.equals(table))
+ continue;
+ try {
+ for (TabletStats ts : getOnlineTabletsForTable(e.getKey(), table)) {
+ KeyExtent ke = new KeyExtent(ts.extent);
+ int index = r.nextInt(underCapacityTServer.size());
+ TServerInstance dest = underCapacityTServer.get(index);
+ if (dest.equals(e.getKey()))
+ continue;
+ migrationsOut.add(new TabletMigration(ke, e.getKey(), dest));
+ if (numTablets.put(dest, numTablets.get(dest) + 1) > avg)
+ underCapacityTServer.remove(index);
+ if (numTablets.put(e.getKey(), numTablets.get(e.getKey()) - 1) <=
avg && !underCapacityTServer.contains(e.getKey()))
+ underCapacityTServer.add(e.getKey());
+
+ // We can get some craziness with only 1 tserver, so lets make
sure there's always an option!
+ if (underCapacityTServer.isEmpty())
+ underCapacityTServer.addAll(numTablets.keySet());
+ }
+ } catch (ThriftSecurityException e1) {
+ // Shouldn't happen, but carry on if it does
+ e1.printStackTrace();
+ } catch (TException e1) {
+ // Shouldn't happen, but carry on if it does
+ e1.printStackTrace();
+ }
+ }
+ }
+
+ return 100;
+ }
+
+ @Override
+ public void init(ServerConfiguration conf) {
+ super.init(conf);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/dedc9cdd/server/base/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --cc
server/base/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
index 1fcab46,0000000..9a970e7
mode 100644,000000..100644
---
a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
@@@ -1,319 -1,0 +1,329 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.balancer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.log4j.Logger;
+
+public class DefaultLoadBalancer extends TabletBalancer {
+
+ private static final Logger log =
Logger.getLogger(DefaultLoadBalancer.class);
+
+ Iterator<TServerInstance> assignments;
+ // if tableToBalance is set, then only balance the given table
+ String tableToBalance = null;
+
+ public DefaultLoadBalancer() {
+
+ }
+
+ public DefaultLoadBalancer(String table) {
+ tableToBalance = table;
+ }
+
+ List<TServerInstance> randomize(Set<TServerInstance> locations) {
+ List<TServerInstance> result = new ArrayList<TServerInstance>(locations);
+ Collections.shuffle(result);
+ return result;
+ }
+
+ public TServerInstance
getAssignment(SortedMap<TServerInstance,TabletServerStatus> locations,
KeyExtent extent, TServerInstance last) {
+ if (locations.size() == 0)
+ return null;
+
+ if (last != null) {
+ // Maintain locality
+ String fakeSessionID = " ";
+ TServerInstance simple = new TServerInstance(last.getLocation(),
fakeSessionID);
+ Iterator<TServerInstance> find =
locations.tailMap(simple).keySet().iterator();
+ if (find.hasNext()) {
+ TServerInstance current = find.next();
+ if (current.host().equals(last.host()))
+ return current;
+ }
+ }
+
+ // The strategy here is to walk through the locations and hand them back,
one at a time
+ // Grab an iterator off of the set of options; use a new iterator if it
hands back something not in the current list.
+ if (assignments == null || !assignments.hasNext())
+ assignments = randomize(locations.keySet()).iterator();
+ TServerInstance result = assignments.next();
+ if (!locations.containsKey(result)) {
+ assignments = null;
+ return randomize(locations.keySet()).iterator().next();
+ }
+ return result;
+ }
+
+ static class ServerCounts implements Comparable<ServerCounts> {
+ public final TServerInstance server;
+ public final int count;
+ public final TabletServerStatus status;
+
+ ServerCounts(int count, TServerInstance server, TabletServerStatus
status) {
+ this.count = count;
+ this.server = server;
+ this.status = status;
+ }
+
+ public int compareTo(ServerCounts obj) {
+ int result = count - obj.count;
+ if (result == 0)
+ return server.compareTo(obj.server);
+ return result;
+ }
+ }
+
+ public boolean getMigrations(Map<TServerInstance,TabletServerStatus>
current, List<TabletMigration> result) {
+ boolean moreBalancingNeeded = false;
+ try {
+ // no moves possible
+ if (current.size() < 2) {
+ return false;
+ }
+
+ // Sort by total number of online tablets, per server
+ int total = 0;
+ ArrayList<ServerCounts> totals = new ArrayList<ServerCounts>();
+ for (Entry<TServerInstance,TabletServerStatus> entry :
current.entrySet()) {
+ int serverTotal = 0;
+ if (entry.getValue() != null && entry.getValue().tableMap != null) {
+ for (Entry<String,TableInfo> e :
entry.getValue().tableMap.entrySet()) {
+ /**
+ * The check below was on entry.getKey(), but that resolves to a
tabletserver not a tablename. Believe it should be e.getKey() which is a
tablename
+ */
+ if (tableToBalance == null || tableToBalance.equals(e.getKey()))
+ serverTotal += e.getValue().onlineTablets;
+ }
+ }
+ totals.add(new ServerCounts(serverTotal, entry.getKey(),
entry.getValue()));
+ total += serverTotal;
+ }
+
+ // order from low to high
+ Collections.sort(totals);
+ Collections.reverse(totals);
+ int even = total / totals.size();
+ int numServersOverEven = total % totals.size();
+
+ // Move tablets from the servers with too many to the servers with
+ // the fewest but only nominate tablets to move once. This allows us
+ // to fill new servers with tablets from a mostly balanced server
+ // very quickly. However, it may take several balancing passes to move
+ // tablets from one hugely overloaded server to many slightly
+ // under-loaded servers.
+ int end = totals.size() - 1;
+ int movedAlready = 0;
+ for (int tooManyIndex = 0; tooManyIndex < totals.size();
tooManyIndex++) {
+ ServerCounts tooMany = totals.get(tooManyIndex);
+ int goal = even;
+ if (tooManyIndex < numServersOverEven) {
+ goal++;
+ }
+ int needToUnload = tooMany.count - goal;
+ ServerCounts tooLittle = totals.get(end);
+ int needToLoad = goal - tooLittle.count - movedAlready;
+ if (needToUnload < 1 && needToLoad < 1) {
+ break;
+ }
+ if (needToUnload >= needToLoad) {
+ result.addAll(move(tooMany, tooLittle, needToLoad));
+ end--;
+ movedAlready = 0;
+ } else {
+ result.addAll(move(tooMany, tooLittle, needToUnload));
+ movedAlready += needToUnload;
+ }
+ if (needToUnload > needToLoad)
+ moreBalancingNeeded = true;
+ }
+
+ } finally {
+ log.debug("balance ended with " + result.size() + " migrations");
+ }
+ return moreBalancingNeeded;
+ }
+
+ static class TableDiff {
+ int diff;
+ String table;
+
+ public TableDiff(int diff, String table) {
+ this.diff = diff;
+ this.table = table;
+ }
+ };
+
+ /**
+ * Select a tablet based on differences between table loads; if the loads
are even, use the busiest table
+ */
+ List<TabletMigration> move(ServerCounts tooMuch, ServerCounts tooLittle,
int count) {
+
+ List<TabletMigration> result = new ArrayList<TabletMigration>();
+ if (count == 0)
+ return result;
+
+ Map<String,Map<KeyExtent,TabletStats>> onlineTablets = new
HashMap<String,Map<KeyExtent,TabletStats>>();
+ // Copy counts so we can update them as we propose migrations
+ Map<String,Integer> tooMuchMap = tabletCountsPerTable(tooMuch.status);
+ Map<String,Integer> tooLittleMap = tabletCountsPerTable(tooLittle.status);
+
+ for (int i = 0; i < count; i++) {
+ String table;
+ Integer tooLittleCount;
+ if (tableToBalance == null) {
+ // find a table to migrate
+ // look for an uneven table count
+ int biggestDifference = 0;
+ String biggestDifferenceTable = null;
+ for (Entry<String,Integer> tableEntry : tooMuchMap.entrySet()) {
+ String tableID = tableEntry.getKey();
+ if (tooLittleMap.get(tableID) == null)
+ tooLittleMap.put(tableID, 0);
+ int diff = tableEntry.getValue() - tooLittleMap.get(tableID);
+ if (diff > biggestDifference) {
+ biggestDifference = diff;
+ biggestDifferenceTable = tableID;
+ }
+ }
+ if (biggestDifference < 2) {
+ table = busiest(tooMuch.status.tableMap);
+ } else {
+ table = biggestDifferenceTable;
+ }
+ } else {
+ // just balance the given table
+ table = tableToBalance;
+ }
+ Map<KeyExtent,TabletStats> onlineTabletsForTable =
onlineTablets.get(table);
+ try {
+ if (onlineTabletsForTable == null) {
+ onlineTabletsForTable = new HashMap<KeyExtent,TabletStats>();
+ for (TabletStats stat : getOnlineTabletsForTable(tooMuch.server,
table))
+ onlineTabletsForTable.put(new KeyExtent(stat.extent), stat);
+ onlineTablets.put(table, onlineTabletsForTable);
+ }
+ } catch (Exception ex) {
+ log.error("Unable to select a tablet to move", ex);
+ return result;
+ }
+ KeyExtent extent = selectTablet(tooMuch.server, onlineTabletsForTable);
+ onlineTabletsForTable.remove(extent);
+ if (extent == null)
+ return result;
+ tooMuchMap.put(table, tooMuchMap.get(table) - 1);
+ /**
+ * If a table grows from 1 tablet then tooLittleMap.get(table) can
return a null, since there is only one tabletserver that holds all of the
tablets. Here
+ * we check to see if in fact that is the case and if so set the value
to 0.
+ */
+ tooLittleCount = tooLittleMap.get(table);
+ if (tooLittleCount == null) {
+ tooLittleCount = 0;
+ }
+ tooLittleMap.put(table, tooLittleCount + 1);
+
+ result.add(new TabletMigration(extent, tooMuch.server,
tooLittle.server));
+ }
+ return result;
+ }
+
+ static Map<String,Integer> tabletCountsPerTable(TabletServerStatus status) {
+ Map<String,Integer> result = new HashMap<String,Integer>();
+ if (status != null && status.tableMap != null) {
+ Map<String,TableInfo> tableMap = status.tableMap;
+ for (Entry<String,TableInfo> entry : tableMap.entrySet()) {
+ result.put(entry.getKey(), entry.getValue().onlineTablets);
+ }
+ }
+ return result;
+ }
+
+ static KeyExtent selectTablet(TServerInstance tserver,
Map<KeyExtent,TabletStats> extents) {
+ if (extents.size() == 0)
+ return null;
+ KeyExtent mostRecentlySplit = null;
+ long splitTime = 0;
+ for (Entry<KeyExtent,TabletStats> entry : extents.entrySet())
+ if (entry.getValue().splitCreationTime >= splitTime) {
+ splitTime = entry.getValue().splitCreationTime;
+ mostRecentlySplit = entry.getKey();
+ }
+ return mostRecentlySplit;
+ }
+
+ // define what it means for a tablet to be busy
+ private static String busiest(Map<String,TableInfo> tables) {
+ String result = null;
+ double busiest = Double.NEGATIVE_INFINITY;
+ for (Entry<String,TableInfo> entry : tables.entrySet()) {
+ TableInfo info = entry.getValue();
+ double busy = info.ingestRate + info.queryRate;
+ if (busy > busiest) {
+ busiest = busy;
+ result = entry.getKey();
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void getAssignments(SortedMap<TServerInstance,TabletServerStatus>
current, Map<KeyExtent,TServerInstance> unassigned,
+ Map<KeyExtent,TServerInstance> assignments) {
+ for (Entry<KeyExtent,TServerInstance> entry : unassigned.entrySet()) {
+ assignments.put(entry.getKey(), getAssignment(current, entry.getKey(),
entry.getValue()));
+ }
+ }
-
++
++ private static final NoTservers NO_SERVERS = new NoTservers(log);
++
++ protected final OutstandingMigrations outstandingMigrations = new
OutstandingMigrations(log);
++
+ @Override
+ public long balance(SortedMap<TServerInstance,TabletServerStatus> current,
Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
+ // do we have any servers?
+ if (current.size() > 0) {
+ // Don't migrate if we have migrations in progress
+ if (migrations.size() == 0) {
++ resetBalancerErrors();
+ if (getMigrations(current, migrationsOut))
+ return 1 * 1000;
++ } else {
++ outstandingMigrations.migrations = migrations;
++ constraintNotMet(outstandingMigrations);
+ }
++ } else {
++ constraintNotMet(NO_SERVERS);
+ }
+ return 5 * 1000;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/dedc9cdd/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
----------------------------------------------------------------------
diff --cc
server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
index 5bd1632,0000000..fb97628
mode 100644,000000..100644
---
a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
@@@ -1,149 -1,0 +1,240 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.balancer;
+
+import java.util.ArrayList;
++import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+
++import com.google.common.collect.Iterables;
++
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import
org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+
+public abstract class TabletBalancer {
+
+ private static final Logger log = Logger.getLogger(TabletBalancer.class);
+
+ protected ServerConfiguration configuration;
+
+ /**
+ * Initialize the TabletBalancer. This gives the balancer the opportunity
to read the configuration.
+ */
+ public void init(ServerConfiguration conf) {
+ configuration = conf;
+ }
+
+ /**
+ * Assign tablets to tablet servers. This method is called whenever the
master finds tablets that are unassigned.
+ *
+ * @param current
+ * The current table-summary state of all the online tablet
servers. Read-only. The TabletServerStatus for each server may be null if the
tablet
+ * server has not yet responded to a recent request for status.
+ * @param unassigned
+ * A map from unassigned tablet to the last known tablet server.
Read-only.
+ * @param assignments
+ * A map from tablet to assigned server. Write-only.
+ */
+ abstract public void
getAssignments(SortedMap<TServerInstance,TabletServerStatus> current,
Map<KeyExtent,TServerInstance> unassigned,
+ Map<KeyExtent,TServerInstance> assignments);
+
+ /**
+ * Ask the balancer if any migrations are necessary.
++ *
++ * If the balancer is going to self-abort due to some environmental
constraint (e.g. it requires some minimum number of tservers, or a maximum
number
++ * of outstanding migrations), it should issue a log message to alert
operators. The message should be at WARN normally and at ERROR if the balancer
knows that the
++ * problem can not self correct. It should not issue these messages more
than once a minute. Subclasses can use the convenience methods of {@link
#constraintNotMet()} and
++ * {@link #balanceSuccessful()} to accomplish this logging.
+ *
+ * @param current
+ * The current table-summary state of all the online tablet
servers. Read-only.
+ * @param migrations
+ * the current set of migrations. Read-only.
+ * @param migrationsOut
+ * new migrations to perform; should not contain tablets in the
current set of migrations. Write-only.
+ * @return the time, in milliseconds, to wait before re-balancing.
+ *
+ * This method will not be called when there are unassigned tablets.
+ */
+ public abstract long balance(SortedMap<TServerInstance,TabletServerStatus>
current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut);
++
++ private static final long ONE_SECOND = 1000l;
++ private boolean stuck = false;
++ private long stuckNotificationTime = -1l;
++
++ protected static final long TIME_BETWEEN_BALANCER_WARNINGS = 60 *
ONE_SECOND;
++
++ /**
++ * A deferred call descendent TabletBalancers use to log why they can't
continue.
++ * The call is deferred so that TabletBalancer can limit how often messages
happen.
++ *
++ * Implementations should be reused as much as possible.
++ *
++ * Be sure to pass in a properly scoped Logger instance so that messages
indicate
++ * what part of the system is having trouble.
++ */
++ protected static abstract class BalancerProblem implements Runnable {
++ protected final Logger balancerLog;
++ public BalancerProblem(Logger logger) {
++ balancerLog = logger;
++ }
++ }
++
++ /**
++ * If a TabletBalancer requires active tservers, it should use this problem
to indicate when there are none.
++ * NoTservers is safe to share with anyone who uses the same Logger.
TabletBalancers should have a single
++ * static instance.
++ */
++ protected static class NoTservers extends BalancerProblem {
++ public NoTservers(Logger logger) {
++ super(logger);
++ }
++
++ @Override
++ public void run() {
++ balancerLog.warn("Not balancing because we don't have any tservers");
++ }
++ }
++
++ /**
++ * If a TabletBalancer only balances when there are no outstanding
migrations, it should use this problem
++ * to indicate when they exist.
++ *
++ * Iff a TabletBalancer makes use of the migrations member to provide
samples, then OutstandingMigrations
++ * is not thread safe.
++ */
++ protected static class OutstandingMigrations extends BalancerProblem {
++ public Set<KeyExtent> migrations = Collections.<KeyExtent>emptySet();
++
++ public OutstandingMigrations(Logger logger) {
++ super(logger);
++ }
++
++ @Override
++ public void run() {
++ balancerLog.warn("Not balancing due to " + migrations.size() + "
outstanding migrations.");
++ /* TODO ACCUMULO-2938 redact key extents in this output to avoid
leaking protected information. */
++ balancerLog.debug("Sample up to 10 outstanding migrations: " +
Iterables.limit(migrations, 10));
++ }
++ }
++
++ /**
++ * Warn that a Balancer can't work because of some external restriction.
++ * Will not call the provided logging handler more often than
TIME_BETWEEN_BALANCER_WARNINGS
++ */
++ protected void constraintNotMet(BalancerProblem cause) {
++ if (!stuck) {
++ stuck = true;
++ stuckNotificationTime = System.currentTimeMillis();
++ } else {
++ if ((System.currentTimeMillis() - stuckNotificationTime) >
TIME_BETWEEN_BALANCER_WARNINGS) {
++ cause.run();
++ stuckNotificationTime = System.currentTimeMillis();
++ }
++ }
++ }
++
++ /**
++ * Resets logging about problems meeting an external constraint on
balancing.
++ */
++ protected void resetBalancerErrors() {
++ stuck = false;
++ }
+
+ /**
+ * Fetch the tablets for the given table by asking the tablet server.
Useful if your balance strategy needs details at the tablet level to decide
what tablets
+ * to move.
+ *
+ * @param tserver
+ * The tablet server to ask.
+ * @param tableId
+ * The table id
+ * @return a list of tablet statistics
+ * @throws ThriftSecurityException
+ * tablet server disapproves of your internal System password.
+ * @throws TException
+ * any other problem
+ */
+ public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver,
String tableId) throws ThriftSecurityException, TException {
+ log.debug("Scanning tablet server " + tserver + " for table " + tableId);
+ Client client = ThriftUtil.getClient(new
TabletClientService.Client.Factory(), tserver.getLocation(),
configuration.getConfiguration());
+ try {
+ List<TabletStats> onlineTabletsForTable =
client.getTabletStats(Tracer.traceInfo(),
SystemCredentials.get().toThrift(configuration.getInstance()),
+ tableId);
+ return onlineTabletsForTable;
+ } catch (TTransportException e) {
+ log.error("Unable to connect to " + tserver + ": " + e);
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+ return null;
+ }
+
+ /**
+ * Utility to ensure that the migrations from balance() are consistent:
+ * <ul>
+ * <li>Tablet objects are not null
+ * <li>Source and destination tablet servers are not null and current
+ * </ul>
+ *
+ * @return A list of TabletMigration object that passed sanity checks.
+ */
+ public static List<TabletMigration>
checkMigrationSanity(Set<TServerInstance> current, List<TabletMigration>
migrations) {
+ List<TabletMigration> result = new
ArrayList<TabletMigration>(migrations.size());
+ for (TabletMigration m : migrations) {
+ if (m.tablet == null) {
+ log.warn("Balancer gave back a null tablet " + m);
+ continue;
+ }
+ if (m.newServer == null) {
+ log.warn("Balancer did not set the destination " + m);
+ continue;
+ }
+ if (m.oldServer == null) {
+ log.warn("Balancer did not set the source " + m);
+ continue;
+ }
+ if (!current.contains(m.oldServer)) {
+ log.warn("Balancer wants to move a tablet from a server that is not
current: " + m);
+ continue;
+ }
+ if (!current.contains(m.newServer)) {
+ log.warn("Balancer wants to move a tablet to a server that is not
current: " + m);
+ continue;
+ }
+ result.add(m);
+ }
+ return result;
+ }
+
+}