Repository: accumulo Updated Branches: refs/heads/master 642b7a95f -> aaaca5240
ACCUMULO-3999 use a threadpool to fetch status from tablet servers Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/aaaca524 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/aaaca524 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/aaaca524 Branch: refs/heads/master Commit: aaaca5240b13f4ba029c8202a3c2f446f81fb8c5 Parents: 642b7a9 Author: Eric C. Newton <eric.new...@gmail.com> Authored: Mon Oct 19 10:38:51 2015 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Mon Oct 19 10:38:51 2015 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 2 + .../java/org/apache/accumulo/master/Master.java | 74 ++++++++++++-------- 2 files changed, 47 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/aaaca524/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 400577c..ab5c6d2 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -222,6 +222,8 @@ public enum Property { "Minimum number of threads dedicated to answering coordinator requests"), MASTER_REPLICATION_COORDINATOR_THREADCHECK("master.replication.coordinator.threadcheck.time", "5s", PropertyType.TIMEDURATION, "The time between adjustments of the coordinator thread pool"), + MASTER_STATUS_THREAD_POOL_SIZE("master.status.threadpool.size", "1", PropertyType.COUNT, + "The number of threads to use when fetching the tablet server status for balancing."), // properties that are specific to tablet server behavior TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the tablet servers"), http://git-wip-us.apache.org/repos/asf/accumulo/blob/aaaca524/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index ff4705e..152831d 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -33,6 +33,8 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -1049,40 +1051,54 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List private SortedMap<TServerInstance,TabletServerStatus> gatherTableInformation() { long start = System.currentTimeMillis(); - SortedMap<TServerInstance,TabletServerStatus> result = new TreeMap<TServerInstance,TabletServerStatus>(); + int threads = Math.max(getConfiguration().getCount(Property.MASTER_STATUS_THREAD_POOL_SIZE), 1); + ExecutorService tp = Executors.newFixedThreadPool(threads); + final SortedMap<TServerInstance,TabletServerStatus> result = new TreeMap<TServerInstance,TabletServerStatus>(); Set<TServerInstance> currentServers = tserverSet.getCurrentServers(); - for (TServerInstance server : currentServers) { - try { - Thread t = Thread.currentThread(); - String oldName = t.getName(); - try { - t.setName("Getting status from " + server); - TServerConnection connection = tserverSet.getConnection(server); - if (connection == null) - throw new IOException("No connection to " + server); - TabletServerStatus status = connection.getTableMap(false); - result.put(server, status); - } finally { - t.setName(oldName); - } - } catch (Exception ex) { - log.error("unable to get tablet server status " + server + " " + ex.toString()); - log.debug("unable to get tablet server status " + server, ex); - if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) { - log.warn("attempting to stop " + server); + for (TServerInstance serverInstance : currentServers) { + final TServerInstance server = serverInstance; + tp.submit(new Runnable() { + @Override + public void run() { try { - TServerConnection connection = tserverSet.getConnection(server); - if (connection != null) { - connection.halt(masterLock); + Thread t = Thread.currentThread(); + String oldName = t.getName(); + try { + t.setName("Getting status from " + server); + TServerConnection connection = tserverSet.getConnection(server); + if (connection == null) + throw new IOException("No connection to " + server); + TabletServerStatus status = connection.getTableMap(false); + result.put(server, status); + } finally { + t.setName(oldName); + } + } catch (Exception ex) { + log.error("unable to get tablet server status " + server + " " + ex.toString()); + log.debug("unable to get tablet server status " + server, ex); + if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) { + log.warn("attempting to stop " + server); + try { + TServerConnection connection = tserverSet.getConnection(server); + if (connection != null) { + connection.halt(masterLock); + } + } catch (TTransportException e) { + // ignore: it's probably down + } catch (Exception e) { + log.info("error talking to troublesome tablet server ", e); + } + badServers.remove(server); } - } catch (TTransportException e) { - // ignore: it's probably down - } catch (Exception e) { - log.info("error talking to troublesome tablet server ", e); } - badServers.remove(server); } - } + }); + } + tp.shutdown(); + try { + tp.awaitTermination(5, TimeUnit.MINUTES); + } catch (InterruptedException e) { + log.debug("Interrupted while fetching status"); } synchronized (badServers) { badServers.keySet().retainAll(currentServers);