ACCUMULO-378 Resize the threadpool used for sending data by checking the configuration periodically
Use the SimpleTimer to just schedule a check of the configuration to see what the value is for the maximum size of the threadpool which is used with the DistributedWorkQueue and ReplicationProcessor. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0ff0e021 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0ff0e021 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0ff0e021 Branch: refs/heads/ACCUMULO-378 Commit: 0ff0e021d3fc95794137dfeb3f6e1335b61b0a16 Parents: db10cfe Author: Josh Elser <els...@apache.org> Authored: Wed May 28 17:23:51 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Wed May 28 17:23:51 2014 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/tserver/TabletServer.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/0ff0e021/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 567b2ad..e4c7ef9 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -3241,10 +3241,24 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu log.info("Started replication service at " + replicationAddress); // Start the pool to handle outgoing replications - ThreadPoolExecutor replicationThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), "replication task"); + final ThreadPoolExecutor replicationThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), "replication task"); replWorker.setExecutor(replicationThreadPool); replWorker.run(); + // Check the configuration value for the size of the pool and, if changed, resize the pool, every 5 seconds); + final AccumuloConfiguration aconf = getSystemConfiguration(); + Runnable replicationWorkThreadPoolResizer = new Runnable() { + @Override + public void run() { + int maxPoolSize = aconf.getCount(Property.REPLICATION_WORKER_THREADS); + if (replicationThreadPool.getMaximumPoolSize() != maxPoolSize) { + log.info("Resizing thread pool for sending replication work from " + replicationThreadPool.getMaximumPoolSize() + " to " + maxPoolSize); + replicationThreadPool.setMaximumPoolSize(maxPoolSize); + } + } + }; + SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 10000, 30000); + try { OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName()); // Do this because interface not in same package.