HBASE-6778 Deprecate Chore; its a thread per task when we should have one thread to do all tasks (Jonathan Lawlor)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/538388c2 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/538388c2 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/538388c2 Branch: refs/heads/master Commit: 538388c2b59cd56dd41a63dabc9eafd8f743ca0c Parents: c61c17b Author: stack <[email protected]> Authored: Thu Jan 29 19:26:26 2015 -0800 Committer: stack <[email protected]> Committed: Thu Jan 29 19:26:26 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/ConnectionManager.java | 75 --------- .../java/org/apache/hadoop/hbase/AuthUtil.java | 12 +- .../java/org/apache/hadoop/hbase/Chore.java | 157 ------------------- .../apache/hadoop/hbase/rest/RESTServlet.java | 14 +- .../apache/hadoop/hbase/HealthCheckChore.java | 4 +- .../java/org/apache/hadoop/hbase/Server.java | 5 + .../hadoop/hbase/master/CatalogJanitor.java | 9 +- .../hbase/master/ClusterStatusPublisher.java | 37 ++--- .../org/apache/hadoop/hbase/master/HMaster.java | 27 ++-- .../hadoop/hbase/master/SplitLogManager.java | 23 +-- .../hbase/master/balancer/BalancerChore.java | 13 +- .../master/balancer/ClusterStatusChore.java | 13 +- .../hbase/master/cleaner/CleanerChore.java | 6 +- .../apache/hadoop/hbase/quotas/QuotaCache.java | 9 +- .../hbase/regionserver/HRegionServer.java | 113 ++++++------- .../hbase/regionserver/HeapMemoryManager.java | 25 +-- .../regionserver/RegionServerServices.java | 6 +- .../hbase/regionserver/ServerNonceManager.java | 15 +- .../regionserver/StorefileRefresherChore.java | 8 +- .../regionserver/ReplicationSyncUp.java | 6 + .../org/apache/hadoop/hbase/tool/Canary.java | 11 +- .../hadoop/hbase/util/ConnectionCache.java | 19 ++- .../hadoop/hbase/MockRegionServerServices.java | 7 +- .../hadoop/hbase/backup/TestHFileArchiving.java | 13 +- .../TestZooKeeperTableArchiveClient.java | 7 +- .../mapreduce/TestLoadIncrementalHFiles.java | 35 ++--- .../hadoop/hbase/master/MockRegionServer.java | 8 +- .../hbase/master/TestActiveMasterManager.java | 6 + .../hadoop/hbase/master/TestCatalogJanitor.java | 20 ++- .../hbase/master/TestClockSkewDetection.java | 6 + .../hbase/master/TestSplitLogManager.java | 6 + .../hbase/master/TestTableLockManager.java | 16 +- .../hbase/master/cleaner/TestHFileCleaner.java | 7 +- .../master/cleaner/TestHFileLinkCleaner.java | 15 +- .../hbase/master/cleaner/TestLogsCleaner.java | 6 + .../hbase/namespace/TestNamespaceAuditor.java | 2 +- .../TestEndToEndSplitTransaction.java | 11 +- .../regionserver/TestHeapMemoryManager.java | 24 ++- .../regionserver/TestServerNonceManager.java | 13 +- .../hbase/regionserver/TestSplitLogWorker.java | 6 + .../replication/TestReplicationStateZKImpl.java | 6 + .../TestReplicationTrackerZKImpl.java | 6 + .../TestReplicationSourceManager.java | 18 ++- .../security/token/TestTokenAuthentication.java | 6 + .../apache/hadoop/hbase/util/MockServer.java | 6 + 45 files changed, 381 insertions(+), 476 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index c5ddb54..dbd555c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -43,7 +43,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -55,7 +54,6 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotFoundException; @@ -579,8 +577,6 @@ final class ConnectionManager { private final Object masterAndZKLock = new Object(); private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE; - private final DelayedClosing delayedClosing = - DelayedClosing.createAndStart(this); // thread executor shared by all HTableInterface instances created // by this connection @@ -1387,7 +1383,6 @@ final class ConnectionManager { HConnection connection; MasterService.BlockingInterface stub; int userCount; - long keepAliveUntil = Long.MAX_VALUE; MasterServiceState(final HConnection connection) { super(); @@ -1633,71 +1628,6 @@ final class ConnectionManager { } } - /** - * Creates a Chore thread to check the connections to master & zookeeper - * and close them when they reach their closing time ( - * {@link MasterServiceState#keepAliveUntil} and - * {@link #keepZooKeeperWatcherAliveUntil}). Keep alive time is - * managed by the release functions and the variable {@link #keepAlive} - */ - private static final class DelayedClosing extends Chore implements Stoppable { - private HConnectionImplementation hci; - Stoppable stoppable; - - private DelayedClosing( - HConnectionImplementation hci, Stoppable stoppable){ - super( - "ZooKeeperWatcher and Master delayed closing for connection "+hci, - 60*1000, // We check every minutes - stoppable); - this.hci = hci; - this.stoppable = stoppable; - } - - static DelayedClosing createAndStart(HConnectionImplementation hci){ - Stoppable stoppable = new Stoppable() { - private volatile boolean isStopped = false; - @Override public void stop(String why) { isStopped = true;} - @Override public boolean isStopped() {return isStopped;} - }; - - return new DelayedClosing(hci, stoppable); - } - - protected void closeMasterProtocol(MasterServiceState protocolState) { - if (System.currentTimeMillis() > protocolState.keepAliveUntil) { - hci.closeMasterService(protocolState); - protocolState.keepAliveUntil = Long.MAX_VALUE; - } - } - - @Override - protected void chore() { - synchronized (hci.masterAndZKLock) { - if (hci.canCloseZKW) { - if (System.currentTimeMillis() > - hci.keepZooKeeperWatcherAliveUntil) { - - hci.closeZooKeeperWatcher(); - hci.keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE; - } - } - closeMasterProtocol(hci.masterServiceState); - closeMasterProtocol(hci.masterServiceState); - } - } - - @Override - public void stop(String why) { - stoppable.stop(why); - } - - @Override - public boolean isStopped() { - return stoppable.isStopped(); - } - } - private void closeZooKeeperWatcher() { synchronized (masterAndZKLock) { if (keepAliveZookeeper != null) { @@ -1720,7 +1650,6 @@ final class ConnectionManager { private void resetMasterServiceState(final MasterServiceState mss) { mss.userCount++; - mss.keepAliveUntil = Long.MAX_VALUE; } @Override @@ -2085,9 +2014,6 @@ final class ConnectionManager { if (mss.getStub() == null) return; synchronized (masterAndZKLock) { --mss.userCount; - if (mss.userCount <= 0) { - mss.keepAliveUntil = System.currentTimeMillis() + keepAlive; - } } } @@ -2408,7 +2334,6 @@ final class ConnectionManager { if (this.closed) { return; } - delayedClosing.stop("Closing connection"); closeMaster(); shutdownBatchPool(); this.closed = true; http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java index 282b5e3..f597935 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.net.DNS; import org.apache.hadoop.security.UserGroupInformation; @@ -47,12 +46,12 @@ public class AuthUtil { /** * Checks if security is enabled and if so, launches chore for refreshing kerberos ticket. */ - public static void launchAuthChore(Configuration conf) throws IOException { + public static ScheduledChore getAuthChore(Configuration conf) throws IOException { UserProvider userProvider = UserProvider.instantiate(conf); // login the principal (if using secure Hadoop) boolean securityEnabled = userProvider.isHadoopSecurityEnabled() && userProvider.isHBaseSecurityEnabled(); - if (!securityEnabled) return; + if (!securityEnabled) return null; String host = null; try { host = Strings.domainNamePointerToHostName(DNS.getDefaultHost( @@ -87,7 +86,8 @@ public class AuthUtil { // e.g. 5min tgt * 0.8 = 4min refresh so interval is better be way less than 1min final int CHECK_TGT_INTERVAL = 30 * 1000; // 30sec - Chore refreshCredentials = new Chore("RefreshCredentials", CHECK_TGT_INTERVAL, stoppable) { + ScheduledChore refreshCredentials = + new ScheduledChore("RefreshCredentials", stoppable, CHECK_TGT_INTERVAL) { @Override protected void chore() { try { @@ -97,7 +97,7 @@ public class AuthUtil { } } }; - // Start the chore for refreshing credentials - Threads.setDaemonThreadRunning(refreshCredentials.getThread()); + + return refreshCredentials; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-common/src/main/java/org/apache/hadoop/hbase/Chore.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Chore.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Chore.java deleted file mode 100644 index 42d9d37..0000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Chore.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.HasThread; -import org.apache.hadoop.hbase.util.Sleeper; - -/** - * Chore is a task performed on a period in hbase. The chore is run in its own - * thread. This base abstract class provides while loop and sleeping facility. - * If an unhandled exception, the threads exit is logged. - * Implementers just need to add checking if there is work to be done and if - * so, do it. Its the base of most of the chore threads in hbase. - * - * <p>Don't subclass Chore if the task relies on being woken up for something to - * do, such as an entry being added to a queue, etc. - */ [email protected] -public abstract class Chore extends HasThread { - private final Log LOG = LogFactory.getLog(this.getClass()); - private final Sleeper sleeper; - private final Stoppable stopper; - - /** - * @param p Period at which we should run. Will be adjusted appropriately - * should we find work and it takes time to complete. - * @param stopper When {@link Stoppable#isStopped()} is true, this thread will - * cleanup and exit cleanly. - */ - public Chore(String name, final int p, final Stoppable stopper) { - super(name); - if (stopper == null){ - throw new NullPointerException("stopper cannot be null"); - } - this.sleeper = new Sleeper(p, stopper); - this.stopper = stopper; - } - - /** - * This constructor is for test only. It allows to create an object and to call chore() on - * it. There is no sleeper nor stoppable. - */ - protected Chore(){ - sleeper = null; - stopper = null; - } - - /** - * @return the sleep period in milliseconds - */ - public final int getPeriod() { - return sleeper.getPeriod(); - } - - /** - * @see java.lang.Thread#run() - */ - @Override - public void run() { - try { - boolean initialChoreComplete = false; - while (!this.stopper.isStopped()) { - long startTime = System.currentTimeMillis(); - try { - if (!initialChoreComplete) { - initialChoreComplete = initialChore(); - } else { - chore(); - } - } catch (Exception e) { - LOG.error("Caught exception", e); - if (this.stopper.isStopped()) { - continue; - } - } - this.sleeper.sleep(startTime); - } - } catch (Throwable t) { - LOG.fatal(getName() + "error", t); - } finally { - LOG.info(getName() + " exiting"); - cleanup(); - } - } - - /** - * If the thread is currently sleeping, trigger the core to happen immediately. - * If it's in the middle of its operation, will begin another operation - * immediately after finishing this one. - */ - public void triggerNow() { - this.sleeper.skipSleepCycle(); - } - - /* - * Exposed for TESTING! - * calls directly the chore method, from the current thread. - */ - public void choreForTesting() { - chore(); - } - - /** - * Override to run a task before we start looping. - * @return true if initial chore was successful - */ - protected boolean initialChore() { - // Default does nothing. - return true; - } - - /** - * Look for chores. If any found, do them else just return. - */ - protected abstract void chore(); - - /** - * Sleep for period. - */ - protected void sleep() { - this.sleeper.sleep(); - } - - /** - * Called when the chore has completed, allowing subclasses to cleanup any - * extra overhead - */ - protected void cleanup() { - } - - protected Stoppable getStopper() { - return stopper; - } - - protected Sleeper getSleeper() { - return sleeper; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java index ff42271..bb93bc8 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java @@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.rest; import java.io.IOException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.ParseFilter; @@ -74,7 +74,10 @@ public class RESTServlet implements Constants { } public synchronized static void stop() { - if (INSTANCE != null) INSTANCE = null; + if (INSTANCE != null) { + INSTANCE.shutdown(); + INSTANCE = null; + } } /** @@ -130,6 +133,13 @@ public class RESTServlet implements Constants { connectionCache.setEffectiveUser(effectiveUser); } + /** + * Shutdown any services that need to stop + */ + void shutdown() { + if (connectionCache != null) connectionCache.shutdown(); + } + boolean supportsProxyuser() { return conf.getBoolean(HBASE_REST_SUPPORT_PROXYUSER, false); } http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java index 8d65c66..e729ec8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java @@ -28,7 +28,7 @@ import org.apache.hadoop.util.StringUtils; /** * The Class HealthCheckChore for running health checker regularly. */ - public class HealthCheckChore extends Chore { +public class HealthCheckChore extends ScheduledChore { private static Log LOG = LogFactory.getLog(HealthCheckChore.class); private HealthChecker healthChecker; private Configuration config; @@ -38,7 +38,7 @@ import org.apache.hadoop.util.StringUtils; private long startWindow; public HealthCheckChore(int sleepTime, Stoppable stopper, Configuration conf) { - super("HealthChecker", sleepTime, stopper); + super("HealthChecker", stopper, sleepTime); LOG.info("Health Check Chore runs every " + StringUtils.formatTime(sleepTime)); this.config = conf; String healthCheckScript = this.config.get(HConstants.HEALTH_SCRIPT_LOC); http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java index 6b79f80..85f8471 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java @@ -65,4 +65,9 @@ public interface Server extends Abortable, Stoppable { * Get CoordinatedStateManager instance for this server. */ CoordinatedStateManager getCoordinatedStateManager(); + + /** + * @return The {@link ChoreService} instance for this server + */ + ChoreService getChoreService(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java index 9f71b90..9d18c98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java @@ -31,12 +31,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.HFileArchiver; @@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.util.Triple; * table on a period looking for unused regions to garbage collect. */ @InterfaceAudience.Private -public class CatalogJanitor extends Chore { +public class CatalogJanitor extends ScheduledChore { private static final Log LOG = LogFactory.getLog(CatalogJanitor.class.getName()); private final Server server; private final MasterServices services; @@ -66,9 +66,8 @@ public class CatalogJanitor extends Chore { private final Connection connection; CatalogJanitor(final Server server, final MasterServices services) { - super("CatalogJanitor-" + server.getServerName().toShortString(), - server.getConfiguration().getInt("hbase.catalogjanitor.interval", 300000), - server); + super("CatalogJanitor-" + server.getServerName().toShortString(), server, server + .getConfiguration().getInt("hbase.catalogjanitor.interval", 300000)); this.server = server; this.services = services; this.connection = server.getConnection(); http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java index 6e7024c..e90aae6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java @@ -35,22 +35,7 @@ import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.handler.codec.MessageToMessageEncoder; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import io.netty.util.internal.StringUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; -import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; -import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.ExceptionUtil; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.util.VersionInfo; import java.io.Closeable; import java.io.IOException; @@ -67,6 +52,22 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ExceptionUtil; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.VersionInfo; + /** * Class to publish the cluster status to the client. This allows them to know immediately @@ -75,7 +76,7 @@ import java.util.concurrent.ConcurrentMap; * on the client the different timeouts, as the dead servers will be detected separately. */ @InterfaceAudience.Private -public class ClusterStatusPublisher extends Chore { +public class ClusterStatusPublisher extends ScheduledChore { /** * The implementation class used to publish the status. Default is null (no publish). * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the @@ -115,8 +116,8 @@ public class ClusterStatusPublisher extends Chore { public ClusterStatusPublisher(HMaster master, Configuration conf, Class<? extends Publisher> publisherClass) throws IOException { - super("HBase clusterStatusPublisher for " + master.getName(), - conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD), master); + super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt( + STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD)); this.master = master; this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD); try { http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index a539489..020d6fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -384,7 +384,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { " is not set - not publishing status"); } else { clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass); - Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread()); + getChoreService().scheduleChore(clusterStatusPublisherChore); } } activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this); @@ -726,11 +726,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server { // been assigned. status.setStatus("Starting balancer and catalog janitor"); this.clusterStatusChore = new ClusterStatusChore(this, balancer); - Threads.setDaemonThreadRunning(clusterStatusChore.getThread()); + getChoreService().scheduleChore(clusterStatusChore); this.balancerChore = new BalancerChore(this); - Threads.setDaemonThreadRunning(balancerChore.getThread()); + getChoreService().scheduleChore(balancerChore); this.catalogJanitorChore = new CatalogJanitor(this, this); - Threads.setDaemonThreadRunning(catalogJanitorChore.getThread()); + getChoreService().scheduleChore(catalogJanitorChore); status.setStatus("Starting namespace manager"); initNamespace(); @@ -1013,16 +1013,13 @@ public class HMaster extends HRegionServer implements MasterServices, Server { new LogCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(), getMasterFileSystem().getOldLogDir()); - Threads.setDaemonThreadRunning(logCleaner.getThread(), - getServerName().toShortString() + ".oldLogCleaner"); + getChoreService().scheduleChore(logCleaner); //start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem() .getFileSystem(), archiveDir); - Threads.setDaemonThreadRunning(hfileCleaner.getThread(), - getServerName().toShortString() + ".archivedHFileCleaner"); - + getChoreService().scheduleChore(hfileCleaner); serviceStarted = true; if (LOG.isTraceEnabled()) { LOG.trace("Started service threads"); @@ -1051,8 +1048,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server { LOG.debug("Stopping service threads"); } // Clean up and close up shop - if (this.logCleaner!= null) this.logCleaner.interrupt(); - if (this.hfileCleaner != null) this.hfileCleaner.interrupt(); + if (this.logCleaner != null) this.logCleaner.cancel(true); + if (this.hfileCleaner != null) this.hfileCleaner.cancel(true); if (this.quotaManager != null) this.quotaManager.stop(); if (this.activeMasterManager != null) this.activeMasterManager.stop(); if (this.serverManager != null) this.serverManager.stop(); @@ -1063,16 +1060,16 @@ public class HMaster extends HRegionServer implements MasterServices, Server { private void stopChores() { if (this.balancerChore != null) { - this.balancerChore.interrupt(); + this.balancerChore.cancel(true); } if (this.clusterStatusChore != null) { - this.clusterStatusChore.interrupt(); + this.clusterStatusChore.cancel(true); } if (this.catalogJanitorChore != null) { - this.catalogJanitorChore.interrupt(); + this.catalogJanitorChore.cancel(true); } if (this.clusterStatusPublisherChore != null){ - clusterStatusPublisherChore.interrupt(); + clusterStatusPublisherChore.cancel(true); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index bc798cd..8a7a362 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -39,30 +39,31 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; import com.google.common.annotations.VisibleForTesting; @@ -103,6 +104,7 @@ public class SplitLogManager { private final Stoppable stopper; private final Configuration conf; + private final ChoreService choreService; public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min @@ -139,6 +141,7 @@ public class SplitLogManager { this.server = server; this.conf = conf; this.stopper = stopper; + this.choreService = new ChoreService(serverName.toString() + "_splitLogManager_"); if (server.getCoordinatedStateManager() != null) { SplitLogManagerCoordination coordination = ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) @@ -155,8 +158,7 @@ public class SplitLogManager { this.timeoutMonitor = new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper); - Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName - + ".splitLogManagerTimeoutMonitor"); + choreService.scheduleChore(timeoutMonitor); } private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException { @@ -529,8 +531,11 @@ public class SplitLogManager { } public void stop() { + if (choreService != null) { + choreService.shutdown(); + } if (timeoutMonitor != null) { - timeoutMonitor.interrupt(); + timeoutMonitor.cancel(true); } } @@ -684,11 +689,11 @@ public class SplitLogManager { /** * Periodically checks all active tasks and resubmits the ones that have timed out */ - private class TimeoutMonitor extends Chore { + private class TimeoutMonitor extends ScheduledChore { private long lastLog = 0; public TimeoutMonitor(final int period, Stoppable stopper) { - super("SplitLogManager Timeout Monitor", period, stopper); + super("SplitLogManager Timeout Monitor", stopper, period); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerChore.java index 21fe272..bbbfdf2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerChore.java @@ -18,28 +18,27 @@ package org.apache.hadoop.hbase.master.balancer; +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.master.HMaster; -import java.io.IOException; - /** * Chore that will call HMaster.balance{@link org.apache.hadoop.hbase.master.HMaster#balance()} when * needed. */ @InterfaceAudience.Private -public class BalancerChore extends Chore { +public class BalancerChore extends ScheduledChore { private static final Log LOG = LogFactory.getLog(BalancerChore.class); private final HMaster master; public BalancerChore(HMaster master) { - super(master.getServerName() + "-BalancerChore", - master.getConfiguration().getInt("hbase.balancer.period", 300000), - master); + super(master.getServerName() + "-BalancerChore", master, master.getConfiguration().getInt( + "hbase.balancer.period", 300000)); this.master = master; } http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterStatusChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterStatusChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterStatusChore.java index 046dfb7..58e5808 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterStatusChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterStatusChore.java @@ -18,28 +18,27 @@ package org.apache.hadoop.hbase.master.balancer; +import java.io.InterruptedIOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.LoadBalancer; -import java.io.InterruptedIOException; - /** * Chore that will feed the balancer the cluster status. */ @InterfaceAudience.Private -public class ClusterStatusChore extends Chore { +public class ClusterStatusChore extends ScheduledChore { private static final Log LOG = LogFactory.getLog(ClusterStatusChore.class); private final HMaster master; private final LoadBalancer balancer; public ClusterStatusChore(HMaster master, LoadBalancer balancer) { - super(master.getServerName() + "-ClusterStatusChore", - master.getConfiguration().getInt("hbase.balancer.statusPeriod", 60000), - master); + super(master.getServerName() + "-ClusterStatusChore", master, master.getConfiguration().getInt( + "hbase.balancer.statusPeriod", 60000)); this.master = master; this.balancer = balancer; } http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index 294131e..05a5a9e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.ipc.RemoteException; @@ -41,7 +41,7 @@ import com.google.common.collect.Lists; * Abstract Cleaner that uses a chain of delegates to clean a directory of files * @param <T> Cleaner delegate class that is dynamically loaded from configuration */ -public abstract class CleanerChore<T extends FileCleanerDelegate> extends Chore { +public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore { private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName()); @@ -61,7 +61,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Chore */ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, FileSystem fs, Path oldFileDir, String confKey) { - super(name, sleepPeriod, s); + super(name, s, sleepPeriod); this.fs = fs; this.oldFileDir = oldFileDir; this.conf = conf; http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java index 8cd402d..15962d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java @@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.security.UserGroupInformation; import com.google.common.annotations.VisibleForTesting; @@ -86,7 +85,7 @@ public class QuotaCache implements Stoppable { Configuration conf = rsServices.getConfiguration(); int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD); refreshChore = new QuotaRefresherChore(period, this); - Threads.setDaemonThreadRunning(refreshChore.getThread()); + rsServices.getChoreService().scheduleChore(refreshChore); } @Override @@ -198,11 +197,11 @@ public class QuotaCache implements Stoppable { } // TODO: Remove this once we have the notification bus - private class QuotaRefresherChore extends Chore { + private class QuotaRefresherChore extends ScheduledChore { private long lastUpdate = 0; public QuotaRefresherChore(final int period, final Stoppable stoppable) { - super("QuotaRefresherChore", period, stoppable); + super("QuotaRefresherChore", stoppable, period); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index bc52eb8..409107f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -25,6 +25,7 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; import java.lang.reflect.Constructor; import java.net.BindException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; @@ -46,7 +47,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.net.InetAddress; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; @@ -58,7 +58,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; @@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HealthCheckChore; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableDescriptors; @@ -76,10 +77,10 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.conf.ConfigurationManager; -import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; @@ -127,10 +128,7 @@ import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.trace.SpanReceiverHost; @@ -147,6 +145,9 @@ import org.apache.hadoop.hbase.util.JvmPauseMonitor; import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -325,15 +326,20 @@ public class HRegionServer extends HasThread implements MetricsRegionServer metricsRegionServer; private SpanReceiverHost spanReceiverHost; + /** + * ChoreService used to schedule tasks that we want to run periodically + */ + private final ChoreService choreService; + /* * Check for compactions requests. */ - Chore compactionChecker; + ScheduledChore compactionChecker; /* * Check for flushes */ - Chore periodicFlusher; + ScheduledChore periodicFlusher; protected volatile WALFactory walFactory; @@ -372,7 +378,7 @@ public class HRegionServer extends HasThread implements private HealthCheckChore healthCheckChore; /** The nonce manager chore. */ - private Chore nonceManagerChore; + private ScheduledChore nonceManagerChore; private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap(); @@ -552,6 +558,7 @@ public class HRegionServer extends HasThread implements rpcServices.start(); putUpWebUI(); this.walRoller = new LogRoller(this, this); + this.choreService = new ChoreService(getServerName().toString()); } protected void login(UserProvider user, String host) throws IOException { @@ -779,8 +786,8 @@ public class HRegionServer extends HasThread implements movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this); if (this.nonceManager != null) { - // Create the chore that cleans up nonces. - nonceManagerChore = this.nonceManager.createCleanupChore(this); + // Create the scheduled chore that cleans up nonces. + nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); } // Setup the Quota Manager @@ -935,17 +942,10 @@ public class HRegionServer extends HasThread implements if(this.hMemManager != null) this.hMemManager.stop(); if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); - if (this.compactionChecker != null) - this.compactionChecker.interrupt(); - if (this.healthCheckChore != null) { - this.healthCheckChore.interrupt(); - } - if (this.nonceManagerChore != null) { - this.nonceManagerChore.interrupt(); - } - if (this.storefileRefresher != null) { - this.storefileRefresher.interrupt(); - } + if (this.compactionChecker != null) this.compactionChecker.cancel(true); + if (this.healthCheckChore != null) this.healthCheckChore.cancel(true); + if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true); + if (this.storefileRefresher != null) this.storefileRefresher.cancel(true); // Stop the quota manager if (rsQuotaManager != null) { @@ -1306,7 +1306,7 @@ public class HRegionServer extends HasThread implements private void startHeapMemoryManager() { this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this); if (this.hMemManager != null) { - this.hMemManager.start(); + this.hMemManager.start(getChoreService()); } } @@ -1421,7 +1421,7 @@ public class HRegionServer extends HasThread implements /* * Inner class that runs on a long period checking if regions need compaction. */ - private static class CompactionChecker extends Chore { + private static class CompactionChecker extends ScheduledChore { private final HRegionServer instance; private final int majorCompactPriority; private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; @@ -1429,7 +1429,7 @@ public class HRegionServer extends HasThread implements CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { - super("CompactionChecker", sleepTime, h); + super("CompactionChecker", stopper, sleepTime); this.instance = h; LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime)); @@ -1475,12 +1475,12 @@ public class HRegionServer extends HasThread implements } } - class PeriodicMemstoreFlusher extends Chore { + class PeriodicMemstoreFlusher extends ScheduledChore { final HRegionServer server; final static int RANGE_OF_DELAY = 20000; //millisec final static int MIN_DELAY_TIME = 3000; //millisec public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) { - super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server); + super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval); this.server = server; } @@ -1621,22 +1621,12 @@ public class HRegionServer extends HasThread implements Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", uncaughtExceptionHandler); this.cacheFlusher.start(uncaughtExceptionHandler); - Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), getName() + - ".compactionChecker", uncaughtExceptionHandler); - Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), getName() + - ".periodicFlusher", uncaughtExceptionHandler); - if (this.healthCheckChore != null) { - Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), getName() + ".healthChecker", - uncaughtExceptionHandler); - } - if (this.nonceManagerChore != null) { - Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), getName() + ".nonceCleaner", - uncaughtExceptionHandler); - } - if (this.storefileRefresher != null) { - Threads.setDaemonThreadRunning(this.storefileRefresher.getThread(), getName() + ".storefileRefresher", - uncaughtExceptionHandler); - } + + if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); + if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher); + if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore); + if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore); + if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher); // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. @@ -1729,8 +1719,8 @@ public class HRegionServer extends HasThread implements // Verify that all threads are alive if (!(leases.isAlive() && cacheFlusher.isAlive() && walRoller.isAlive() - && this.compactionChecker.isAlive() - && this.periodicFlusher.isAlive())) { + && this.compactionChecker.isScheduled() + && this.periodicFlusher.isScheduled())) { stop("One or more threads are no longer alive -- stop"); return false; } @@ -1994,21 +1984,18 @@ public class HRegionServer extends HasThread implements * have already been called. */ protected void stopServiceThreads() { - if (this.nonceManagerChore != null) { - Threads.shutdown(this.nonceManagerChore.getThread()); - } - if (this.compactionChecker != null) { - Threads.shutdown(this.compactionChecker.getThread()); - } - if (this.periodicFlusher != null) { - Threads.shutdown(this.periodicFlusher.getThread()); - } + // clean up the scheduled chores + if (this.choreService != null) choreService.shutdown(); + if (this.nonceManagerChore != null) nonceManagerChore.cancel(true); + if (this.compactionChecker != null) compactionChecker.cancel(true); + if (this.periodicFlusher != null) periodicFlusher.cancel(true); + if (this.healthCheckChore != null) healthCheckChore.cancel(true); + if (this.storefileRefresher != null) storefileRefresher.cancel(true); + if (this.cacheFlusher != null) { this.cacheFlusher.join(); } - if (this.healthCheckChore != null) { - Threads.shutdown(this.healthCheckChore.getThread()); - } + if (this.spanReceiverHost != null) { this.spanReceiverHost.closeReceivers(); } @@ -2034,9 +2021,6 @@ public class HRegionServer extends HasThread implements this.replicationSinkHandler.stopReplicationService(); } } - if (this.storefileRefresher != null) { - Threads.shutdown(this.storefileRefresher.getThread()); - } } /** @@ -2436,6 +2420,11 @@ public class HRegionServer extends HasThread implements } @Override + public ChoreService getChoreService() { + return choreService; + } + + @Override public RegionServerQuotaManager getRegionServerQuotaManager() { return rsQuotaManager; } @@ -2944,13 +2933,13 @@ public class HRegionServer extends HasThread implements /** * Creates a Chore thread to clean the moved region cache. */ - protected static class MovedRegionsCleaner extends Chore implements Stoppable { + protected static class MovedRegionsCleaner extends ScheduledChore implements Stoppable { private HRegionServer regionServer; Stoppable stoppable; private MovedRegionsCleaner( HRegionServer regionServer, Stoppable stoppable){ - super("MovedRegionsCleaner for region "+regionServer, TIMEOUT_REGION_MOVED, stoppable); + super("MovedRegionsCleaner for region " + regionServer, stoppable, TIMEOUT_REGION_MOVED); this.regionServer = regionServer; this.stoppable = stoppable; } http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java index 112634e..43deb58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java @@ -26,16 +26,16 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.annotations.VisibleForTesting; @@ -183,11 +183,11 @@ public class HeapMemoryManager { return true; } - public void start() { - LOG.info("Starting HeapMemoryTuner chore."); - this.heapMemTunerChore = new HeapMemoryTunerChore(); - Threads.setDaemonThreadRunning(heapMemTunerChore.getThread()); - if (tunerOn) { + public void start(ChoreService service) { + LOG.info("Starting HeapMemoryTuner chore."); + this.heapMemTunerChore = new HeapMemoryTunerChore(); + service.scheduleChore(heapMemTunerChore); + if (tunerOn) { // Register HeapMemoryTuner as a memstore flush listener memStoreFlusher.registerFlushRequestListener(heapMemTunerChore); } @@ -196,7 +196,8 @@ public class HeapMemoryManager { public void stop() { // The thread is Daemon. Just interrupting the ongoing process. LOG.info("Stoping HeapMemoryTuner chore."); - this.heapMemTunerChore.interrupt(); + this.heapMemTunerChore.cancel(true); + } // Used by the test cases. @@ -211,7 +212,7 @@ public class HeapMemoryManager { return this.heapOccupancyPercent; } - private class HeapMemoryTunerChore extends Chore implements FlushRequestListener { + private class HeapMemoryTunerChore extends ScheduledChore implements FlushRequestListener { private HeapMemoryTuner heapMemTuner; private AtomicLong blockedFlushCount = new AtomicLong(); private AtomicLong unblockedFlushCount = new AtomicLong(); @@ -220,7 +221,7 @@ public class HeapMemoryManager { private boolean alarming = false; public HeapMemoryTunerChore() { - super(server.getServerName() + "-HeapMemoryTunerChore", defaultChorePeriod, server); + super(server.getServerName() + "-HeapMemoryTunerChore", server, defaultChorePeriod); Class<? extends HeapMemoryTuner> tunerKlass = server.getConfiguration().getClass( HBASE_RS_HEAP_MEMORY_TUNER_CLASS, DefaultHeapMemoryTuner.class, HeapMemoryTuner.class); heapMemTuner = ReflectionUtils.newInstance(tunerKlass, server.getConfiguration()); @@ -239,7 +240,7 @@ public class HeapMemoryManager { " is above heap occupancy alarm watermark (" + heapOccupancyLowWatermark + ")"); alarming = true; } - getSleeper().skipSleepCycle(); + triggerNow(); try { // Need to sleep ourselves since we've told the chore's sleeper // to skip the next sleep cycle. http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 3565195..5e0905a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -18,17 +18,15 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.protobuf.Service; - import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.master.TableLockManager; @@ -37,6 +35,8 @@ import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.wal.WAL; import org.apache.zookeeper.KeeperException; +import com.google.protobuf.Service; + /** * Services provided by {@link HRegionServer} */ http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java index 128cb6f..0d974b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java @@ -18,20 +18,19 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.Date; import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import com.google.common.annotations.VisibleForTesting; @@ -247,13 +246,13 @@ public class ServerNonceManager { } /** - * Creates a chore that is used to clean up old nonces. + * Creates a scheduled chore that is used to clean up old nonces. * @param stoppable Stoppable for the chore. - * @return Chore; the chore is not started. + * @return ScheduledChore; the scheduled chore is not started. */ - public Chore createCleanupChore(Stoppable stoppable) { + public ScheduledChore createCleanupScheduledChore(Stoppable stoppable) { // By default, it will run every 6 minutes (30 / 5). - return new Chore("nonceCleaner", deleteNonceGracePeriod / 5, stoppable) { + return new ScheduledChore("nonceCleaner", stoppable, deleteNonceGracePeriod / 5) { @Override protected void chore() { cleanUpOldNonces(); http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java index f3c7e6a..4918391 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java @@ -25,9 +25,9 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils; @@ -42,7 +42,7 @@ import org.apache.hadoop.util.StringUtils; * primary region). */ @InterfaceAudience.Private -public class StorefileRefresherChore extends Chore { +public class StorefileRefresherChore extends ScheduledChore { private static final Log LOG = LogFactory.getLog(StorefileRefresherChore.class); @@ -69,7 +69,7 @@ public class StorefileRefresherChore extends Chore { public StorefileRefresherChore(int period, boolean onlyMetaRefresh, HRegionServer regionServer, Stoppable stoppable) { - super("StorefileRefresherChore", period, stoppable); + super("StorefileRefresherChore", stoppable, period); this.period = period; this.regionServer = regionServer; this.hfileTtl = this.regionServer.getConfiguration().getLong( http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index 997692f..4b997b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -181,5 +182,10 @@ public class ReplicationSyncUp extends Configured implements Tool { public ClusterConnection getConnection() { return null; } + + @Override + public ChoreService getChoreService() { + return null; + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java index 23673b6..309a1c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java @@ -36,12 +36,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; @@ -773,8 +775,15 @@ public final class Canary implements Tool { public static void main(String[] args) throws Exception { final Configuration conf = HBaseConfiguration.create(); - AuthUtil.launchAuthChore(conf); + final ChoreService choreService = new ChoreService("CANARY_TOOL"); + final ScheduledChore authChore = AuthUtil.getAuthChore(conf); + if (authChore != null) { + choreService.scheduleChore(authChore); + } + int exitCode = ToolRunner.run(conf, new Canary(), args); + + choreService.shutdown(); System.exit(exitCode); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java index fc4e74c..21714af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java @@ -23,10 +23,11 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; @@ -53,6 +54,7 @@ public class ConnectionCache { private final UserGroupInformation realUser; private final UserProvider userProvider; private final Configuration conf; + private final ChoreService choreService; private final ThreadLocal<String> effectiveUserNames = new ThreadLocal<String>() { @@ -69,8 +71,8 @@ public class ConnectionCache { @Override public void stop(String why) { isStopped = true;} @Override public boolean isStopped() {return isStopped;} }; - - Chore cleaner = new Chore("ConnectionCleaner", cleanInterval, stoppable) { + this.choreService = new ChoreService("ConnectionCache"); + ScheduledChore cleaner = new ScheduledChore("ConnectionCleaner", stoppable, cleanInterval) { @Override protected void chore() { for (Map.Entry<String, ConnectionInfo> entry: connections.entrySet()) { @@ -93,7 +95,7 @@ public class ConnectionCache { } }; // Start the daemon cleaner chore - Threads.setDaemonThreadRunning(cleaner.getThread()); + choreService.scheduleChore(cleaner); this.realUser = userProvider.getCurrent().getUGI(); this.realUserName = realUser.getShortUserName(); this.userProvider = userProvider; @@ -115,6 +117,13 @@ public class ConnectionCache { } /** + * Called when cache is no longer needed so that it can perform cleanup operations + */ + public void shutdown() { + if (choreService != null) choreService.shutdown(); + } + + /** * Caller doesn't close the admin afterwards. * We need to manage it and close it properly. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index e6e98f2..fa156be 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -46,8 +46,8 @@ import org.apache.hadoop.hbase.regionserver.Leases; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -243,6 +243,11 @@ public class MockRegionServerServices implements RegionServerServices { } @Override + public ChoreService getChoreService() { + return null; + } + + @Override public void updateRegionFavoredNodesMapping(String encodedRegionName, List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { } http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java index 8af6016..903ce0e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java @@ -33,10 +33,9 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; @@ -45,6 +44,8 @@ import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil; @@ -77,7 +78,7 @@ public class TestHFileArchiving { UTIL.startMiniCluster(); // We don't want the cleaner to remove files. The tests do that. - UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().interrupt(); + UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().cancel(true); } private static void setupConf(Configuration conf) { @@ -351,6 +352,7 @@ public class TestHFileArchiving { @Test public void testCleaningRace() throws Exception { final long TEST_TIME = 20 * 1000; + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration(); Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace"); @@ -369,7 +371,7 @@ public class TestHFileArchiving { // The cleaner should be looping without long pauses to reproduce the race condition. HFileCleaner cleaner = new HFileCleaner(1, stoppable, conf, fs, archiveDir); try { - cleaner.start(); + choreService.scheduleChore(cleaner); // Keep creating/archiving new files while the cleaner is running in the other thread long startTime = System.currentTimeMillis(); @@ -404,7 +406,8 @@ public class TestHFileArchiving { } } finally { stoppable.stop("test end"); - cleaner.join(); + cleaner.cancel(true); + choreService.shutdown(); fs.delete(rootDir, true); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index 1757804..772c345 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -212,6 +213,7 @@ public class TestZooKeeperTableArchiveClient { Configuration conf = UTIL.getConfiguration(); // setup the delegate Stoppable stop = new StoppableImplementation(); + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); @@ -250,7 +252,7 @@ public class TestZooKeeperTableArchiveClient { // need to be checked) in 'otherTable' and the files (which should be retained) in the 'table' CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3); // run the cleaner - cleaner.start(); + choreService.scheduleChore(cleaner); // wait for the cleaner to check all the files finished.await(); // stop the cleaner @@ -412,8 +414,9 @@ public class TestZooKeeperTableArchiveClient { */ private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop) throws InterruptedException { + final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME"); // run the cleaner - cleaner.start(); + choreService.scheduleChore(cleaner); // wait for the cleaner to check all the files finished.await(); // stop the cleaner http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index d68d55c..813f374 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -33,25 +33,24 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.security.SecureBulkLoadUtil; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HFileTestUtil; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.apache.hadoop.hbase.security.SecureBulkLoadUtil; /** * Test cases for the "load" half of the HFileOutputFormat bulk load @@ -98,7 +97,7 @@ public class TestLoadIncrementalHFiles { * Test case that creates some regions and loads * HFiles that fit snugly inside those regions */ - @Test + @Test(timeout = 60000) public void testSimpleLoad() throws Exception { runTest("testSimpleLoad", BloomType.NONE, new byte[][][] { @@ -111,7 +110,7 @@ public class TestLoadIncrementalHFiles { * Test case that creates some regions and loads * HFiles that cross the boundaries of those regions */ - @Test + @Test(timeout = 60000) public void testRegionCrossingLoad() throws Exception { runTest("testRegionCrossingLoad", BloomType.NONE, new byte[][][] { @@ -123,7 +122,7 @@ public class TestLoadIncrementalHFiles { /** * Test loading into a column family that has a ROW bloom filter. */ - @Test + @Test(timeout = 60000) public void testRegionCrossingRowBloom() throws Exception { runTest("testRegionCrossingLoadRowBloom", BloomType.ROW, new byte[][][] { @@ -135,7 +134,7 @@ public class TestLoadIncrementalHFiles { /** * Test loading into a column family that has a ROWCOL bloom filter. */ - @Test + @Test(timeout = 60000) public void testRegionCrossingRowColBloom() throws Exception { runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL, new byte[][][] { @@ -148,7 +147,7 @@ public class TestLoadIncrementalHFiles { * Test case that creates some regions and loads HFiles that have * different region boundaries than the table pre-split. */ - @Test + @Test(timeout = 60000) public void testSimpleHFileSplit() throws Exception { runTest("testHFileSplit", BloomType.NONE, new byte[][] { @@ -166,7 +165,7 @@ public class TestLoadIncrementalHFiles { * Test case that creates some regions and loads HFiles that cross the boundaries * and have different region boundaries than the table pre-split. */ - @Test + @Test(timeout = 60000) public void testRegionCrossingHFileSplit() throws Exception { testRegionCrossingHFileSplit(BloomType.NONE); } @@ -175,7 +174,7 @@ public class TestLoadIncrementalHFiles { * Test case that creates some regions and loads HFiles that cross the boundaries * have a ROW bloom filter and a different region boundaries than the table pre-split. */ - @Test + @Test(timeout = 60000) public void testRegionCrossingHFileSplitRowBloom() throws Exception { testRegionCrossingHFileSplit(BloomType.ROW); } @@ -184,7 +183,7 @@ public class TestLoadIncrementalHFiles { * Test case that creates some regions and loads HFiles that cross the boundaries * have a ROWCOL bloom filter and a different region boundaries than the table pre-split. */ - @Test + @Test(timeout = 60000) public void testRegionCrossingHFileSplitRowColBloom() throws Exception { testRegionCrossingHFileSplit(BloomType.ROWCOL); } @@ -278,7 +277,7 @@ public class TestLoadIncrementalHFiles { /** * Test loading into a column family that does not exist. */ - @Test + @Test(timeout = 60000) public void testNonexistentColumnFamilyLoad() throws Exception { String testName = "testNonexistentColumnFamilyLoad"; byte[][][] hFileRanges = new byte[][][] { @@ -307,7 +306,7 @@ public class TestLoadIncrementalHFiles { } } - @Test + @Test(timeout = 60000) public void testSplitStoreFile() throws IOException { Path dir = util.getDataTestDirOnTestFS("testSplitHFile"); FileSystem fs = util.getTestFileSystem(); @@ -354,7 +353,7 @@ public class TestLoadIncrementalHFiles { map.put(last, value-1); } - @Test + @Test(timeout = 60000) public void testInferBoundaries() { TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); @@ -411,7 +410,7 @@ public class TestLoadIncrementalHFiles { } } - @Test + @Test(timeout = 60000) public void testLoadTooMayHFiles() throws Exception { Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles"); FileSystem fs = util.getTestFileSystem(); @@ -445,7 +444,7 @@ public class TestLoadIncrementalHFiles { loader.run(args); } - @Test + @Test(timeout = 60000) public void testTableWithCFNameStartWithUnderScore() throws Exception { Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore"); FileSystem fs = util.getTestFileSystem(); http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 0fc33db..2b251d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; @@ -99,8 +100,8 @@ import org.apache.hadoop.hbase.regionserver.Leases; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -551,6 +552,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { } @Override + public ChoreService getChoreService() { + return null; + } + + @Override public void updateRegionFavoredNodesMapping(String encodedRegionName, List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { } http://git-wip-us.apache.org/repos/asf/hbase/blob/538388c2/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java index 363411f..e3283e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.Semaphore; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.Server; @@ -320,5 +321,10 @@ public class TestActiveMasterManager { public ActiveMasterManager getActiveMasterManager() { return activeMasterManager; } + + @Override + public ChoreService getChoreService() { + return null; + } } } \ No newline at end of file
