HBASE-6778 Deprecate Chore; its a thread per task when we should have one thread to do all tasks (Jonathan Lawlor)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/af84b746 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/af84b746 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/af84b746 Branch: refs/heads/branch-1 Commit: af84b746ceab1e4e6ed8a37ce8f1f4546ad3df5c Parents: b9f5c6b Author: stack <[email protected]> Authored: Fri Jan 30 15:27:08 2015 -0800 Committer: stack <[email protected]> Committed: Fri Jan 30 15:27:08 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/ConnectionManager.java | 79 +- .../java/org/apache/hadoop/hbase/AuthUtil.java | 12 +- .../java/org/apache/hadoop/hbase/Chore.java | 142 ---- .../org/apache/hadoop/hbase/ChoreService.java | 368 ++++++++ .../org/apache/hadoop/hbase/ScheduledChore.java | 330 ++++++++ .../apache/hadoop/hbase/TestChoreService.java | 844 +++++++++++++++++++ .../apache/hadoop/hbase/rest/RESTServlet.java | 14 +- .../apache/hadoop/hbase/HealthCheckChore.java | 8 +- .../java/org/apache/hadoop/hbase/Server.java | 5 + .../hadoop/hbase/master/CatalogJanitor.java | 9 +- .../hbase/master/ClusterStatusPublisher.java | 37 +- .../org/apache/hadoop/hbase/master/HMaster.java | 32 +- .../hadoop/hbase/master/SplitLogManager.java | 23 +- .../hbase/master/balancer/BalancerChore.java | 13 +- .../master/balancer/ClusterStatusChore.java | 13 +- .../hbase/master/cleaner/CleanerChore.java | 8 +- .../hbase/regionserver/HRegionServer.java | 109 ++- .../hbase/regionserver/HeapMemoryManager.java | 48 +- .../regionserver/RegionServerServices.java | 6 +- .../hbase/regionserver/ServerNonceManager.java | 15 +- .../regionserver/StorefileRefresherChore.java | 8 +- .../regionserver/ReplicationSyncUp.java | 6 + .../org/apache/hadoop/hbase/tool/Canary.java | 11 +- .../hadoop/hbase/util/ConnectionCache.java | 19 +- .../hadoop/hbase/MockRegionServerServices.java | 7 +- .../hadoop/hbase/backup/TestHFileArchiving.java | 11 +- .../TestZooKeeperTableArchiveClient.java | 7 +- .../mapreduce/TestLoadIncrementalHFiles.java | 30 +- .../hadoop/hbase/master/MockRegionServer.java | 8 +- .../hbase/master/TestActiveMasterManager.java | 6 + .../hadoop/hbase/master/TestCatalogJanitor.java | 19 +- .../hbase/master/TestClockSkewDetection.java | 6 + .../hbase/master/TestSplitLogManager.java | 6 + .../hbase/master/TestTableLockManager.java | 15 +- .../hbase/master/cleaner/TestHFileCleaner.java | 7 +- .../master/cleaner/TestHFileLinkCleaner.java | 16 +- .../hbase/master/cleaner/TestLogsCleaner.java | 6 + .../TestEndToEndSplitTransaction.java | 21 +- .../regionserver/TestHeapMemoryManager.java | 24 +- .../regionserver/TestServerNonceManager.java | 13 +- .../hbase/regionserver/TestSplitLogWorker.java | 6 + .../replication/TestReplicationStateZKImpl.java | 6 + .../TestReplicationTrackerZKImpl.java | 6 + .../TestReplicationSourceManager.java | 14 +- .../security/token/TestTokenAuthentication.java | 6 + .../apache/hadoop/hbase/util/MockServer.java | 6 + 46 files changed, 1931 insertions(+), 474 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index b22d456..63094da 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -42,9 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -56,11 +54,11 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; @@ -159,8 +157,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse; @@ -563,8 +561,6 @@ class ConnectionManager { private final Object masterAndZKLock = new Object(); private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE; - private final DelayedClosing delayedClosing = - DelayedClosing.createAndStart(this); // thread executor shared by all HTableInterface instances created // by this connection @@ -1370,7 +1366,6 @@ class ConnectionManager { HConnection connection; MasterService.BlockingInterface stub; int userCount; - long keepAliveUntil = Long.MAX_VALUE; MasterServiceState (final HConnection connection) { super(); @@ -1616,71 +1611,6 @@ class ConnectionManager { } } - /** - * Creates a Chore thread to check the connections to master & zookeeper - * and close them when they reach their closing time ( - * {@link MasterServiceState#keepAliveUntil} and - * {@link #keepZooKeeperWatcherAliveUntil}). Keep alive time is - * managed by the release functions and the variable {@link #keepAlive} - */ - private static class DelayedClosing extends Chore implements Stoppable { - private HConnectionImplementation hci; - Stoppable stoppable; - - private DelayedClosing( - HConnectionImplementation hci, Stoppable stoppable){ - super( - "ZooKeeperWatcher and Master delayed closing for connection "+hci, - 60*1000, // We check every minutes - stoppable); - this.hci = hci; - this.stoppable = stoppable; - } - - static DelayedClosing createAndStart(HConnectionImplementation hci){ - Stoppable stoppable = new Stoppable() { - private volatile boolean isStopped = false; - @Override public void stop(String why) { isStopped = true;} - @Override public boolean isStopped() {return isStopped;} - }; - - return new DelayedClosing(hci, stoppable); - } - - protected void closeMasterProtocol(MasterServiceState protocolState) { - if (System.currentTimeMillis() > protocolState.keepAliveUntil) { - hci.closeMasterService(protocolState); - protocolState.keepAliveUntil = Long.MAX_VALUE; - } - } - - @Override - protected void chore() { - synchronized (hci.masterAndZKLock) { - if (hci.canCloseZKW) { - if (System.currentTimeMillis() > - hci.keepZooKeeperWatcherAliveUntil) { - - hci.closeZooKeeperWatcher(); - hci.keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE; - } - } - closeMasterProtocol(hci.masterServiceState); - closeMasterProtocol(hci.masterServiceState); - } - } - - @Override - public void stop(String why) { - stoppable.stop(why); - } - - @Override - public boolean isStopped() { - return stoppable.isStopped(); - } - } - private void closeZooKeeperWatcher() { synchronized (masterAndZKLock) { if (keepAliveZookeeper != null) { @@ -1703,7 +1633,6 @@ class ConnectionManager { private void resetMasterServiceState(final MasterServiceState mss) { mss.userCount++; - mss.keepAliveUntil = Long.MAX_VALUE; } @Override @@ -2054,9 +1983,6 @@ class ConnectionManager { if (mss.getStub() == null) return; synchronized (masterAndZKLock) { --mss.userCount; - if (mss.userCount <= 0) { - mss.keepAliveUntil = System.currentTimeMillis() + keepAlive; - } } } @@ -2356,7 +2282,6 @@ class ConnectionManager { if (this.closed) { return; } - delayedClosing.stop("Closing connection"); closeMaster(); shutdownBatchPool(); this.closed = true; http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java index 282b5e3..f597935 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.net.DNS; import org.apache.hadoop.security.UserGroupInformation; @@ -47,12 +46,12 @@ public class AuthUtil { /** * Checks if security is enabled and if so, launches chore for refreshing kerberos ticket. */ - public static void launchAuthChore(Configuration conf) throws IOException { + public static ScheduledChore getAuthChore(Configuration conf) throws IOException { UserProvider userProvider = UserProvider.instantiate(conf); // login the principal (if using secure Hadoop) boolean securityEnabled = userProvider.isHadoopSecurityEnabled() && userProvider.isHBaseSecurityEnabled(); - if (!securityEnabled) return; + if (!securityEnabled) return null; String host = null; try { host = Strings.domainNamePointerToHostName(DNS.getDefaultHost( @@ -87,7 +86,8 @@ public class AuthUtil { // e.g. 5min tgt * 0.8 = 4min refresh so interval is better be way less than 1min final int CHECK_TGT_INTERVAL = 30 * 1000; // 30sec - Chore refreshCredentials = new Chore("RefreshCredentials", CHECK_TGT_INTERVAL, stoppable) { + ScheduledChore refreshCredentials = + new ScheduledChore("RefreshCredentials", stoppable, CHECK_TGT_INTERVAL) { @Override protected void chore() { try { @@ -97,7 +97,7 @@ public class AuthUtil { } } }; - // Start the chore for refreshing credentials - Threads.setDaemonThreadRunning(refreshCredentials.getThread()); + + return refreshCredentials; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-common/src/main/java/org/apache/hadoop/hbase/Chore.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Chore.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Chore.java deleted file mode 100644 index c2c7964..0000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Chore.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.HasThread; -import org.apache.hadoop.hbase.util.Sleeper; - -/** - * Chore is a task performed on a period in hbase. The chore is run in its own - * thread. This base abstract class provides while loop and sleeping facility. - * If an unhandled exception, the threads exit is logged. - * Implementers just need to add checking if there is work to be done and if - * so, do it. Its the base of most of the chore threads in hbase. - * - * <p>Don't subclass Chore if the task relies on being woken up for something to - * do, such as an entry being added to a queue, etc. - */ [email protected] -public abstract class Chore extends HasThread { - private final Log LOG = LogFactory.getLog(this.getClass()); - private final Sleeper sleeper; - protected final Stoppable stopper; - - /** - * @param p Period at which we should run. Will be adjusted appropriately - * should we find work and it takes time to complete. - * @param stopper When {@link Stoppable#isStopped()} is true, this thread will - * cleanup and exit cleanly. - */ - public Chore(String name, final int p, final Stoppable stopper) { - super(name); - if (stopper == null){ - throw new NullPointerException("stopper cannot be null"); - } - this.sleeper = new Sleeper(p, stopper); - this.stopper = stopper; - } - - /** - * This constructor is for test only. It allows to create an object and to call chore() on - * it. There is no sleeper nor stoppable. - */ - protected Chore(){ - sleeper = null; - stopper = null; - } - - /** - * @see java.lang.Thread#run() - */ - @Override - public void run() { - try { - boolean initialChoreComplete = false; - while (!this.stopper.isStopped()) { - long startTime = System.currentTimeMillis(); - try { - if (!initialChoreComplete) { - initialChoreComplete = initialChore(); - } else { - chore(); - } - } catch (Exception e) { - LOG.error("Caught exception", e); - if (this.stopper.isStopped()) { - continue; - } - } - this.sleeper.sleep(startTime); - } - } catch (Throwable t) { - LOG.fatal(getName() + "error", t); - } finally { - LOG.info(getName() + " exiting"); - cleanup(); - } - } - - /** - * If the thread is currently sleeping, trigger the core to happen immediately. - * If it's in the middle of its operation, will begin another operation - * immediately after finishing this one. - */ - public void triggerNow() { - this.sleeper.skipSleepCycle(); - } - - /* - * Exposed for TESTING! - * calls directly the chore method, from the current thread. - */ - public void choreForTesting() { - chore(); - } - - /** - * Override to run a task before we start looping. - * @return true if initial chore was successful - */ - protected boolean initialChore() { - // Default does nothing. - return true; - } - - /** - * Look for chores. If any found, do them else just return. - */ - protected abstract void chore(); - - /** - * Sleep for period. - */ - protected void sleep() { - this.sleeper.sleep(); - } - - /** - * Called when the chore has completed, allowing subclasses to cleanup any - * extra overhead - */ - protected void cleanup() { - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java new file mode 100644 index 0000000..fd6cbc9 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java @@ -0,0 +1,368 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run + * periodically while sharing threads. The ChoreService is backed by a + * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the + * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the + * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads. + * <p> + * The ChoreService provides the ability to schedule, cancel, and trigger instances of + * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of + * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling + * load and whether or not the scheduled chores are executing on time. As more chores are scheduled, + * there may be a need to increase the number of threads if it is noticed that chores are no longer + * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is + * made to reduce the number of running threads to see if chores can still meet their start times + * with a smaller thread pool. + * <p> + * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}. + * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly. + */ [email protected] +public class ChoreService implements ChoreServicer { + private final Log LOG = LogFactory.getLog(this.getClass()); + + /** + * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor + */ + public final static int MIN_CORE_POOL_SIZE = 1; + + /** + * This thread pool is used to schedule all of the Chores + */ + private final ScheduledThreadPoolExecutor scheduler; + + /** + * Maps chores to their futures. Futures are used to control a chore's schedule + */ + private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores; + + /** + * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the + * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to + * increase the core pool size by 1 (otherwise a single long running chore whose execution is + * longer than its period would be able to spawn too many threads). + */ + private final HashMap<ScheduledChore, Boolean> choresMissingStartTime; + + /** + * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the + * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is + * running on. The prefix is useful because it allows us to monitor how the thread pool of a + * particular service changes over time VIA thread dumps. + */ + private final String coreThreadPoolPrefix; + + /** + * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads + * spawned by this service + */ + public ChoreService(final String coreThreadPoolPrefix) { + this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE); + } + + /** + * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads + * spawned by this service + * @param corePoolSize The initial size to set the core pool of the ScheduledThreadPoolExecutor + * to during initialization. The default size is 1, but specifying a larger size may be + * beneficial if you know that 1 thread will not be enough. + */ + public ChoreService(final String coreThreadPoolPrefix, int corePoolSize) { + this.coreThreadPoolPrefix = coreThreadPoolPrefix; + if (corePoolSize < MIN_CORE_POOL_SIZE) corePoolSize = MIN_CORE_POOL_SIZE; + final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix); + scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); + scheduler.setRemoveOnCancelPolicy(true); + scheduledChores = new HashMap<ScheduledChore, ScheduledFuture<?>>(); + choresMissingStartTime = new HashMap<ScheduledChore, Boolean>(); + } + + /** + * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads + * spawned by this service + */ + public static ChoreService getInstance(final String coreThreadPoolPrefix) { + return new ChoreService(coreThreadPoolPrefix); + } + + /** + * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService + * instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled + * with a single ChoreService instance). + * @return true when the chore was successfully scheduled. false when the scheduling failed + * (typically occurs when a chore is scheduled during shutdown of service) + */ + public synchronized boolean scheduleChore(ScheduledChore chore) { + if (chore == null) return false; + + try { + ScheduledFuture<?> future = + scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(), + chore.getTimeUnit()); + chore.setChoreServicer(this); + scheduledChores.put(chore, future); + return true; + } catch (Exception exception) { + if (LOG.isInfoEnabled()) { + LOG.info("Could not successfully schedule chore: " + chore.getName()); + } + return false; + } + } + + /** + * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService + * yet then this call is equivalent to a call to scheduleChore. + */ + private synchronized void rescheduleChore(ScheduledChore chore) { + if (chore == null) return; + + if (scheduledChores.containsKey(chore)) { + ScheduledFuture<?> future = scheduledChores.get(chore); + future.cancel(false); + } + scheduleChore(chore); + } + + @Override + public synchronized void cancelChore(ScheduledChore chore) { + cancelChore(chore, false); + } + + @Override + public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) { + if (chore != null && scheduledChores.containsKey(chore)) { + ScheduledFuture<?> future = scheduledChores.get(chore); + future.cancel(mayInterruptIfRunning); + scheduledChores.remove(chore); + + // Removing a chore that was missing its start time means it may be possible + // to reduce the number of threads + if (choresMissingStartTime.containsKey(chore)) { + choresMissingStartTime.remove(chore); + requestCorePoolDecrease(); + } + } + } + + @Override + public synchronized boolean isChoreScheduled(ScheduledChore chore) { + return chore != null && scheduledChores.containsKey(chore) + && !scheduledChores.get(chore).isDone(); + } + + @Override + public synchronized boolean triggerNow(ScheduledChore chore) { + if (chore == null) { + return false; + } else { + rescheduleChore(chore); + return true; + } + } + + /** + * @return number of chores that this service currently has scheduled + */ + int getNumberOfScheduledChores() { + return scheduledChores.size(); + } + + /** + * @return number of chores that this service currently has scheduled that are missing their + * scheduled start time + */ + int getNumberOfChoresMissingStartTime() { + return choresMissingStartTime.size(); + } + + /** + * @return number of threads in the core pool of the underlying ScheduledThreadPoolExecutor + */ + int getCorePoolSize() { + return scheduler.getCorePoolSize(); + } + + /** + * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are + * daemon threads, and thus, don't prevent the JVM from shutting down + */ + static class ChoreServiceThreadFactory implements ThreadFactory { + private final String threadPrefix; + private final static String THREAD_NAME_SUFFIX = "_ChoreService_"; + private AtomicInteger threadNumber = new AtomicInteger(1); + + /** + * @param threadPrefix The prefix given to all threads created by this factory + */ + public ChoreServiceThreadFactory(final String threadPrefix) { + this.threadPrefix = threadPrefix; + } + + @Override + public Thread newThread(Runnable r) { + Thread thread = + new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement()); + thread.setDaemon(true); + return thread; + } + } + + /** + * Represents a request to increase the number of core pool threads. Typically a request + * originates from the fact that the current core pool size is not sufficient to service all of + * the currently running Chores + * @return true when the request to increase the core pool size succeeds + */ + private synchronized boolean requestCorePoolIncrease() { + // There is no point in creating more threads than scheduledChores.size since scheduled runs + // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced + // amongst occurrences of the same chore). + if (scheduler.getCorePoolSize() < scheduledChores.size()) { + scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1); + printChoreServiceDetails("requestCorePoolIncrease"); + return true; + } + return false; + } + + /** + * Represents a request to decrease the number of core pool threads. Typically a request + * originates from the fact that the current core pool size is more than sufficient to service the + * running Chores. + */ + private synchronized void requestCorePoolDecrease() { + if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) { + scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1); + printChoreServiceDetails("requestCorePoolDecrease"); + } + } + + @Override + public synchronized void onChoreMissedStartTime(ScheduledChore chore) { + if (chore == null || !scheduledChores.containsKey(chore)) return; + + // If the chore has not caused an increase in the size of the core thread pool then request an + // increase. This allows each chore missing its start time to increase the core pool size by + // at most 1. + if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) { + choresMissingStartTime.put(chore, requestCorePoolIncrease()); + } + + // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If + // the chore is NOT rescheduled, future executions of this chore will be delayed more and + // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates + // idle threads to chores based on how delayed they are. + rescheduleChore(chore); + printChoreDetails("onChoreMissedStartTime", chore); + } + + /** + * shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores + * in the middle of execution will be interrupted and shutdown. This service will be unusable + * after this method has been called (i.e. future scheduling attempts will fail). + */ + public void shutdown() { + List<Runnable> ongoing = scheduler.shutdownNow(); + if (LOG.isInfoEnabled()) { + LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + ongoing + " on shutdown"); + } + cancelAllChores(true); + scheduledChores.clear(); + choresMissingStartTime.clear(); + } + + /** + * @return true when the service is shutdown and thus cannot be used anymore + */ + public boolean isShutdown() { + return scheduler.isShutdown(); + } + + /** + * @return true when the service is shutdown and all threads have terminated + */ + public boolean isTerminated() { + return scheduler.isTerminated(); + } + + private void cancelAllChores(final boolean mayInterruptIfRunning) { + ArrayList<ScheduledChore> choresToCancel = new ArrayList<ScheduledChore>(); + // Build list of chores to cancel so we can iterate through a set that won't change + // as chores are cancelled. If we tried to cancel each chore while iterating through + // keySet the results would be undefined because the keySet would be changing + for (ScheduledChore chore : scheduledChores.keySet()) { + choresToCancel.add(chore); + } + for (ScheduledChore chore : choresToCancel) { + chore.cancel(mayInterruptIfRunning); + } + choresToCancel.clear(); + } + + /** + * Prints a summary of important details about the chore. Used for debugging purposes + */ + private void printChoreDetails(final String header, ScheduledChore chore) { + LinkedHashMap<String, String> output = new LinkedHashMap<String, String>(); + output.put(header, ""); + output.put("Chore name: ", chore.getName()); + output.put("Chore period: ", Integer.toString(chore.getPeriod())); + output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns())); + + for (Entry<String, String> entry : output.entrySet()) { + if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue()); + } + } + + /** + * Prints a summary of important details about the service. Used for debugging purposes + */ + private void printChoreServiceDetails(final String header) { + LinkedHashMap<String, String> output = new LinkedHashMap<String, String>(); + output.put(header, ""); + output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize())); + output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores())); + output.put("ChoreService missingStartTimeCount: ", + Integer.toString(getNumberOfChoresMissingStartTime())); + + for (Entry<String, String> entry : output.entrySet()) { + if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java new file mode 100644 index 0000000..84002c5 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java @@ -0,0 +1,330 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; + +/** + * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once + * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The + * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for + * access to the threads in the core thread pool. If an unhandled exception occurs, the chore + * cancellation is logged. Implementers should consider whether or not the Chore will be able to + * execute within the defined period. It is bad practice to define a ScheduledChore whose execution + * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s + * thread pool. + * <p> + * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as + * an entry being added to a queue, etc. + */ [email protected] +public abstract class ScheduledChore implements Runnable { + private final Log LOG = LogFactory.getLog(this.getClass()); + + private final String name; + + /** + * Default values for scheduling parameters should they be excluded during construction + */ + private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + private final static long DEFAULT_INITIAL_DELAY = 0; + + /** + * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically + */ + private final int period; + private final TimeUnit timeUnit; + private final long initialDelay; + + /** + * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is + * not scheduled. + */ + private ChoreServicer choreServicer; + + /** + * Variables that encapsulate the meaningful state information + */ + private long timeOfLastRun = -1; + private long timeOfThisRun = -1; + private boolean initialChoreComplete = false; + + /** + * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been + * stopped, it will cancel itself. This is particularly useful in the case where a single stopper + * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)} + * command can cause many chores to stop together. + */ + private final Stoppable stopper; + + interface ChoreServicer { + /** + * Cancel any ongoing schedules that this chore has with the implementer of this interface. + */ + public void cancelChore(ScheduledChore chore); + public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning); + + /** + * @return true when the chore is scheduled with the implementer of this interface + */ + public boolean isChoreScheduled(ScheduledChore chore); + + /** + * This method tries to execute the chore immediately. If the chore is executing at the time of + * this call, the chore will begin another execution as soon as the current execution finishes + * <p> + * If the chore is not scheduled with a ChoreService, this call will fail. + * @return false when the chore could not be triggered immediately + */ + public boolean triggerNow(ScheduledChore chore); + + /** + * A callback that tells the implementer of this interface that one of the scheduled chores is + * missing its start time. The implication of a chore missing its start time is that the + * service's current means of scheduling may not be sufficient to handle the number of ongoing + * chores (the other explanation is that the chore's execution time is greater than its + * scheduled period). The service should try to increase its concurrency when this callback is + * received. + * @param chore The chore that missed its start time + */ + public void onChoreMissedStartTime(ScheduledChore chore); + } + + /** + * This constructor is for test only. It allows us to create an object and to call chore() on it. + */ + protected ScheduledChore() { + this.name = null; + this.stopper = null; + this.period = 0; + this.initialDelay = DEFAULT_INITIAL_DELAY; + this.timeUnit = DEFAULT_TIME_UNIT; + } + + /** + * @param name Name assigned to Chore. Useful for identification amongst chores of the same type + * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup + * @param period Period with which this Chore repeats execution when scheduled. + */ + public ScheduledChore(final String name, Stoppable stopper, final int period) { + this(name, stopper, period, DEFAULT_INITIAL_DELAY); + } + + /** + * @param name Name assigned to Chore. Useful for identification amongst chores of the same type + * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup + * @param period Period with which this Chore repeats execution when scheduled. + * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A + * value of 0 means the chore will begin to execute immediately. Negative delays are + * invalid and will be corrected to a value of 0. + */ + public ScheduledChore(final String name, Stoppable stopper, final int period, + final long initialDelay) { + this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT); + } + + /** + * @param name Name assigned to Chore. Useful for identification amongst chores of the same type + * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup + * @param period Period with which this Chore repeats execution when scheduled. + * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A + * value of 0 means the chore will begin to execute immediately. Negative delays are + * invalid and will be corrected to a value of 0. + * @param unit The unit that is used to measure period and initialDelay + */ + public ScheduledChore(final String name, Stoppable stopper, final int period, + final long initialDelay, final TimeUnit unit) { + this.name = name; + this.stopper = stopper; + this.period = period; + this.initialDelay = initialDelay < 0 ? 0 : initialDelay; + this.timeUnit = unit; + } + + synchronized void resetState() { + timeOfLastRun = -1; + timeOfThisRun = -1; + initialChoreComplete = false; + } + + /** + * @see java.lang.Thread#run() + */ + @Override + public synchronized void run() { + timeOfLastRun = timeOfThisRun; + timeOfThisRun = System.currentTimeMillis(); + if (missedStartTime() && choreServicer != null) { + choreServicer.onChoreMissedStartTime(this); + if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time"); + } else if (stopper.isStopped()) { + cancel(); + cleanup(); + LOG.info("Chore: " + getName() + " was stopped"); + } else { + try { + if (!initialChoreComplete) { + initialChoreComplete = initialChore(); + } else { + chore(); + } + } catch (Throwable t) { + LOG.error("Caught error", t); + if (this.stopper.isStopped()) { + cancel(); + cleanup(); + } + } + } + } + + /** + * @return How long has it been since this chore last run. Useful for checking if the chore has + * missed its scheduled start time by too large of a margin + */ + synchronized long getTimeBetweenRuns() { + return timeOfThisRun - timeOfLastRun; + } + + /** + * @return true when the time between runs exceeds the acceptable threshold + */ + private synchronized boolean missedStartTime() { + return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun) + && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns(); + } + + private synchronized double getMaximumAllowedTimeBetweenRuns() { + // Threshold used to determine if the Chore's current run started too late + return 1.5 * period; + } + + private synchronized boolean isValidTime(final long time) { + return time > 0 && time <= System.currentTimeMillis(); + } + + /** + * @return false when the Chore is not currently scheduled with a ChoreService + */ + public synchronized boolean triggerNow() { + if (choreServicer != null) { + return choreServicer.triggerNow(this); + } else { + return false; + } + } + + synchronized void setChoreServicer(ChoreServicer service) { + // Chores should only ever be scheduled with a single ChoreService. If the choreServicer + // is changing, cancel any existing schedules of this chore. + if (choreServicer != null && choreServicer != service) { + choreServicer.cancelChore(this, false); + } + choreServicer = service; + timeOfThisRun = System.currentTimeMillis(); + } + + public synchronized void cancel() { + cancel(false); + } + + public synchronized void cancel(boolean mayInterruptIfRunning) { + if (choreServicer != null) choreServicer.cancelChore(this, mayInterruptIfRunning); + + choreServicer = null; + } + + public synchronized String getName() { + return name; + } + + public synchronized Stoppable getStopper() { + return stopper; + } + + public synchronized int getPeriod() { + return period; + } + + public synchronized long getInitialDelay() { + return initialDelay; + } + + public final synchronized TimeUnit getTimeUnit() { + return timeUnit; + } + + public synchronized boolean isInitialChoreComplete() { + return initialChoreComplete; + } + + @VisibleForTesting + synchronized ChoreServicer getChoreServicer() { + return choreServicer; + } + + @VisibleForTesting + synchronized long getTimeOfLastRun() { + return timeOfLastRun; + } + + @VisibleForTesting + synchronized long getTimeOfThisRun() { + return timeOfThisRun; + } + + /** + * @return true when this Chore is scheduled with a ChoreService + */ + public synchronized boolean isScheduled() { + return choreServicer != null && choreServicer.isChoreScheduled(this); + } + + @VisibleForTesting + public synchronized void choreForTesting() { + chore(); + } + + /** + * The task to execute on each scheduled execution of the Chore + */ + protected abstract void chore(); + + /** + * Override to run a task before we start looping. + * @return true if initial chore was successful + */ + protected synchronized boolean initialChore() { + // Default does nothing + return true; + } + + /** + * Override to run cleanup tasks when the Chore encounters an error and must stop running + */ + protected synchronized void cleanup() { + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java new file mode 100644 index 0000000..35cc530 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java @@ -0,0 +1,844 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.CountingChore; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.DoNothingChore; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.FailInitialChore; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SampleStopper; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SleepingChore; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SlowChore; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestChoreService { + private final Log LOG = LogFactory.getLog(this.getClass()); + private final String TEST_SERVER_NAME = "testServerName"; + + /** + * A few ScheduledChore samples that are useful for testing with ChoreService + */ + public static class ScheduledChoreSamples { + /** + * Straight forward stopper implementation that is used by default when one is not provided + */ + public static class SampleStopper implements Stoppable { + private boolean stopped = false; + + @Override + public void stop(String why) { + stopped = true; + } + + @Override + public boolean isStopped() { + return stopped; + } + } + + /** + * Sleeps for longer than the scheduled period. This chore always misses its scheduled periodic + * executions + */ + public static class SlowChore extends ScheduledChore { + public SlowChore(String name, int period) { + this(name, new SampleStopper(), period); + } + + public SlowChore(String name, Stoppable stopper, int period) { + super(name, stopper, period); + } + + @Override + protected boolean initialChore() { + try { + Thread.sleep(getPeriod() * 2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return true; + } + + @Override + protected void chore() { + try { + Thread.sleep(getPeriod() * 2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * Lightweight ScheduledChore used primarily to fill the scheduling queue in tests + */ + public static class DoNothingChore extends ScheduledChore { + public DoNothingChore(String name, int period) { + super(name, new SampleStopper(), period); + } + + public DoNothingChore(String name, Stoppable stopper, int period) { + super(name, stopper, period); + } + + @Override + protected void chore() { + // DO NOTHING + } + + } + + public static class SleepingChore extends ScheduledChore { + private int sleepTime; + + public SleepingChore(String name, int chorePeriod, int sleepTime) { + this(name, new SampleStopper(), chorePeriod, sleepTime); + } + + public SleepingChore(String name, Stoppable stopper, int period, int sleepTime) { + super(name, stopper, period); + this.sleepTime = sleepTime; + } + + @Override + protected boolean initialChore() { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return true; + } + + @Override + protected void chore() { + try { + Thread.sleep(sleepTime); + } catch (Exception e) { + System.err.println(e.getStackTrace()); + } + } + } + + public static class CountingChore extends ScheduledChore { + private int countOfChoreCalls; + private boolean outputOnTicks = false; + + public CountingChore(String name, int period) { + this(name, new SampleStopper(), period); + } + + public CountingChore(String name, Stoppable stopper, int period) { + this(name, stopper, period, false); + } + + public CountingChore(String name, Stoppable stopper, int period, + final boolean outputOnTicks) { + super(name, stopper, period); + this.countOfChoreCalls = 0; + this.outputOnTicks = outputOnTicks; + } + + @Override + protected boolean initialChore() { + countOfChoreCalls++; + if (outputOnTicks) outputTickCount(); + return true; + } + + @Override + protected void chore() { + countOfChoreCalls++; + if (outputOnTicks) outputTickCount(); + } + + private void outputTickCount() { + System.out.println("Chore: " + getName() + ". Count of chore calls: " + countOfChoreCalls); + } + + public int getCountOfChoreCalls() { + return countOfChoreCalls; + } + + public boolean isOutputtingOnTicks() { + return outputOnTicks; + } + + public void setOutputOnTicks(boolean o) { + outputOnTicks = o; + } + } + + /** + * A Chore that will try to execute the initial chore a few times before succeeding. Once the + * initial chore is complete the chore cancels itself + */ + public static class FailInitialChore extends ScheduledChore { + private int numberOfFailures; + private int failureThreshold; + + /** + * @param failThreshold Number of times the Chore fails when trying to execute initialChore + * before succeeding. + */ + public FailInitialChore(String name, int period, int failThreshold) { + this(name, new SampleStopper(), period, failThreshold); + } + + public FailInitialChore(String name, Stoppable stopper, int period, int failThreshold) { + super(name, stopper, period); + numberOfFailures = 0; + failureThreshold = failThreshold; + } + + @Override + protected boolean initialChore() { + if (numberOfFailures < failureThreshold) { + numberOfFailures++; + return false; + } else { + return true; + } + } + + @Override + protected void chore() { + assertTrue(numberOfFailures == failureThreshold); + cancel(false); + } + + } + } + + @Test + public void testInitialChorePrecedence() throws InterruptedException { + ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME); + + final int period = 100; + final int failureThreshold = 5; + ScheduledChore chore = new FailInitialChore("chore", period, failureThreshold); + service.scheduleChore(chore); + + int loopCount = 0; + boolean brokeOutOfLoop = false; + + while (!chore.isInitialChoreComplete() && chore.isScheduled()) { + Thread.sleep(failureThreshold * period); + loopCount++; + if (loopCount > 3) { + brokeOutOfLoop = true; + break; + } + } + + assertFalse(brokeOutOfLoop); + } + + @Test + public void testCancelChore() throws InterruptedException { + final int period = 100; + ScheduledChore chore1 = new DoNothingChore("chore1", period); + ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME); + + service.scheduleChore(chore1); + assertTrue(chore1.isScheduled()); + + chore1.cancel(true); + assertFalse(chore1.isScheduled()); + assertTrue(service.getNumberOfScheduledChores() == 0); + } + + @Test + public void testScheduledChoreConstruction() { + final String NAME = "chore"; + final int PERIOD = 100; + final long VALID_DELAY = 0; + final long INVALID_DELAY = -100; + final TimeUnit UNIT = TimeUnit.NANOSECONDS; + + ScheduledChore chore1 = + new ScheduledChore(NAME, new SampleStopper(), PERIOD, VALID_DELAY, UNIT) { + @Override + protected void chore() { + // DO NOTHING + } + }; + + assertEquals("Name construction failed", chore1.getName(), NAME); + assertEquals("Period construction failed", chore1.getPeriod(), PERIOD); + assertEquals("Initial Delay construction failed", chore1.getInitialDelay(), VALID_DELAY); + assertEquals("TimeUnit construction failed", chore1.getTimeUnit(), UNIT); + + ScheduledChore invalidDelayChore = + new ScheduledChore(NAME, new SampleStopper(), PERIOD, INVALID_DELAY, UNIT) { + @Override + protected void chore() { + // DO NOTHING + } + }; + + assertEquals("Initial Delay should be set to 0 when invalid", 0, + invalidDelayChore.getInitialDelay()); + } + + @Test + public void testChoreServiceConstruction() { + final int corePoolSize = 10; + final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE; + + ChoreService customInit = new ChoreService(TEST_SERVER_NAME, corePoolSize); + assertEquals(corePoolSize, customInit.getCorePoolSize()); + + ChoreService defaultInit = new ChoreService(TEST_SERVER_NAME); + assertEquals(defaultCorePoolSize, defaultInit.getCorePoolSize()); + + ChoreService invalidInit = new ChoreService(TEST_SERVER_NAME, -10); + assertEquals(defaultCorePoolSize, invalidInit.getCorePoolSize()); + } + + @Test + public void testFrequencyOfChores() throws InterruptedException { + final int period = 100; + // Small delta that acts as time buffer (allowing chores to complete if running slowly) + final int delta = 5; + ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME); + CountingChore chore = new CountingChore("countingChore", period); + service.scheduleChore(chore); + + Thread.sleep(10 * period + delta); + assertTrue(chore.getCountOfChoreCalls() == 11); + + Thread.sleep(10 * period); + assertTrue(chore.getCountOfChoreCalls() == 21); + } + + @Test + public void testForceTrigger() throws InterruptedException { + final int period = 100; + final int delta = 5; + ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME); + CountingChore chore = new CountingChore("countingChore", period); + service.scheduleChore(chore); + Thread.sleep(10 * period + delta); + + assertTrue(chore.getCountOfChoreCalls() == 11); + + // Force five runs of the chore to occur, sleeping between triggers to ensure the + // chore has time to run + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + + assertTrue(chore.getCountOfChoreCalls() == 16); + + Thread.sleep(10 * period + delta); + + assertTrue(chore.getCountOfChoreCalls() == 26); + } + + @Test + public void testCorePoolIncrease() throws InterruptedException { + final int initialCorePoolSize = 3; + ChoreService service = new ChoreService(TEST_SERVER_NAME, initialCorePoolSize); + assertEquals("Should have a core pool of size: " + initialCorePoolSize, initialCorePoolSize, + service.getCorePoolSize()); + + final int slowChorePeriod = 100; + SlowChore slowChore1 = new SlowChore("slowChore1", slowChorePeriod); + SlowChore slowChore2 = new SlowChore("slowChore2", slowChorePeriod); + SlowChore slowChore3 = new SlowChore("slowChore3", slowChorePeriod); + + service.scheduleChore(slowChore1); + service.scheduleChore(slowChore2); + service.scheduleChore(slowChore3); + + Thread.sleep(slowChorePeriod * 10); + assertEquals("Should not create more pools than scheduled chores", 3, + service.getCorePoolSize()); + + SlowChore slowChore4 = new SlowChore("slowChore4", slowChorePeriod); + service.scheduleChore(slowChore4); + + Thread.sleep(slowChorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", 4, + service.getCorePoolSize()); + + SlowChore slowChore5 = new SlowChore("slowChore5", slowChorePeriod); + service.scheduleChore(slowChore5); + + Thread.sleep(slowChorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", 5, + service.getCorePoolSize()); + } + + @Test + public void testCorePoolDecrease() throws InterruptedException { + final int initialCorePoolSize = 3; + ChoreService service = new ChoreService(TEST_SERVER_NAME, initialCorePoolSize); + final int chorePeriod = 10; + + // Slow chores always miss their start time and thus the core pool size should be at least as + // large as the number of running slow chores + SlowChore slowChore1 = new SlowChore("slowChore1", chorePeriod); + SlowChore slowChore2 = new SlowChore("slowChore2", chorePeriod); + SlowChore slowChore3 = new SlowChore("slowChore3", chorePeriod); + + service.scheduleChore(slowChore1); + service.scheduleChore(slowChore2); + service.scheduleChore(slowChore3); + + Thread.sleep(chorePeriod * 10); + assertEquals("Should not create more pools than scheduled chores", + service.getNumberOfScheduledChores(), service.getCorePoolSize()); + + SlowChore slowChore4 = new SlowChore("slowChore4", chorePeriod); + service.scheduleChore(slowChore4); + Thread.sleep(chorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", + service.getNumberOfScheduledChores(), service.getCorePoolSize()); + + SlowChore slowChore5 = new SlowChore("slowChore5", chorePeriod); + service.scheduleChore(slowChore5); + Thread.sleep(chorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", + service.getNumberOfScheduledChores(), service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 5); + + slowChore5.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 4); + + slowChore4.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 3); + + slowChore3.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + + slowChore2.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 1); + + slowChore1.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 0); + + slowChore1.resetState(); + service.scheduleChore(slowChore1); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 1); + + slowChore2.resetState(); + service.scheduleChore(slowChore2); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + + DoNothingChore fastChore1 = new DoNothingChore("fastChore1", chorePeriod); + service.scheduleChore(fastChore1); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should increase", 3, service.getCorePoolSize()); + + DoNothingChore fastChore2 = new DoNothingChore("fastChore2", chorePeriod); + service.scheduleChore(fastChore2); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should increase", 3, service.getCorePoolSize()); + + DoNothingChore fastChore3 = new DoNothingChore("fastChore3", chorePeriod); + service.scheduleChore(fastChore3); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should not change", 3, service.getCorePoolSize()); + + DoNothingChore fastChore4 = new DoNothingChore("fastChore4", chorePeriod); + service.scheduleChore(fastChore4); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should not change", 3, service.getCorePoolSize()); + } + + @Test + public void testNumberOfRunningChores() throws InterruptedException { + ChoreService service = new ChoreService(TEST_SERVER_NAME); + + final int period = 100; + final int sleepTime = 5; + + DoNothingChore dn1 = new DoNothingChore("dn1", period); + DoNothingChore dn2 = new DoNothingChore("dn2", period); + DoNothingChore dn3 = new DoNothingChore("dn3", period); + DoNothingChore dn4 = new DoNothingChore("dn4", period); + DoNothingChore dn5 = new DoNothingChore("dn5", period); + + service.scheduleChore(dn1); + service.scheduleChore(dn2); + service.scheduleChore(dn3); + service.scheduleChore(dn4); + service.scheduleChore(dn5); + + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 5, service.getNumberOfScheduledChores()); + + dn1.cancel(); + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 4, service.getNumberOfScheduledChores()); + + dn2.cancel(); + dn3.cancel(); + dn4.cancel(); + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 1, service.getNumberOfScheduledChores()); + + dn5.cancel(); + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 0, service.getNumberOfScheduledChores()); + } + + @Test + public void testNumberOfChoresMissingStartTime() throws InterruptedException { + ChoreService service = new ChoreService(TEST_SERVER_NAME); + + final int period = 100; + final int sleepTime = 5 * period; + + // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores + // ALWAYS miss their start time since their execution takes longer than their period + SlowChore sc1 = new SlowChore("sc1", period); + SlowChore sc2 = new SlowChore("sc2", period); + SlowChore sc3 = new SlowChore("sc3", period); + SlowChore sc4 = new SlowChore("sc4", period); + SlowChore sc5 = new SlowChore("sc5", period); + + service.scheduleChore(sc1); + service.scheduleChore(sc2); + service.scheduleChore(sc3); + service.scheduleChore(sc4); + service.scheduleChore(sc5); + + Thread.sleep(sleepTime); + assertEquals(5, service.getNumberOfChoresMissingStartTime()); + + sc1.cancel(); + Thread.sleep(sleepTime); + assertEquals(4, service.getNumberOfChoresMissingStartTime()); + + sc2.cancel(); + sc3.cancel(); + sc4.cancel(); + Thread.sleep(sleepTime); + assertEquals(1, service.getNumberOfChoresMissingStartTime()); + + sc5.cancel(); + Thread.sleep(sleepTime); + assertEquals(0, service.getNumberOfChoresMissingStartTime()); + } + + /** + * ChoreServices should never have a core pool size that exceeds the number of chores that have + * been scheduled with the service. For example, if 4 ScheduledChores are scheduled with a + * ChoreService, the number of threads in the ChoreService's core pool should never exceed 4 + */ + @Test + public void testMaximumChoreServiceThreads() throws InterruptedException { + ChoreService service = new ChoreService(TEST_SERVER_NAME); + + final int period = 10; + final int sleepTime = 5 * period; + + // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores + // ALWAYS miss their start time since their execution takes longer than their period. + // Chores that miss their start time will trigger the onChoreMissedStartTime callback + // in the ChoreService. This callback will try to increase the number of core pool + // threads. + SlowChore sc1 = new SlowChore("sc1", period); + SlowChore sc2 = new SlowChore("sc2", period); + SlowChore sc3 = new SlowChore("sc3", period); + SlowChore sc4 = new SlowChore("sc4", period); + SlowChore sc5 = new SlowChore("sc5", period); + + service.scheduleChore(sc1); + service.scheduleChore(sc2); + service.scheduleChore(sc3); + service.scheduleChore(sc4); + service.scheduleChore(sc5); + + Thread.sleep(sleepTime); + assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores()); + + SlowChore sc6 = new SlowChore("sc6", period); + SlowChore sc7 = new SlowChore("sc7", period); + SlowChore sc8 = new SlowChore("sc8", period); + SlowChore sc9 = new SlowChore("sc9", period); + SlowChore sc10 = new SlowChore("sc10", period); + + service.scheduleChore(sc6); + service.scheduleChore(sc7); + service.scheduleChore(sc8); + service.scheduleChore(sc9); + service.scheduleChore(sc10); + + Thread.sleep(sleepTime); + assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores()); + } + + @Test + public void testScheduledChoreReset() throws InterruptedException { + final int period = 100; + ChoreService service = new ChoreService(TEST_SERVER_NAME); + ScheduledChore chore = new DoNothingChore("sampleChore", period); + + // TRUE + assertTrue(!chore.isInitialChoreComplete()); + assertTrue(chore.getTimeOfLastRun() == -1); + assertTrue(chore.getTimeOfThisRun() == -1); + + service.scheduleChore(chore); + Thread.sleep(5 * period); + + // FALSE + assertFalse(!chore.isInitialChoreComplete()); + assertFalse(chore.getTimeOfLastRun() == -1); + assertFalse(chore.getTimeOfThisRun() == -1); + + chore.resetState(); + + // TRUE + assertTrue(!chore.isInitialChoreComplete()); + assertTrue(chore.getTimeOfLastRun() == -1); + assertTrue(chore.getTimeOfThisRun() == -1); + } + + @Test + public void testChangingChoreServices() throws InterruptedException { + final int period = 100; + final int sleepTime = 10; + ChoreService service1 = new ChoreService(TEST_SERVER_NAME); + ChoreService service2 = new ChoreService(TEST_SERVER_NAME); + ScheduledChore chore = new DoNothingChore("sample", period); + + assertFalse(chore.isScheduled()); + assertFalse(service1.isChoreScheduled(chore)); + assertFalse(service2.isChoreScheduled(chore)); + assertTrue(chore.getChoreServicer() == null); + + service1.scheduleChore(chore); + Thread.sleep(sleepTime); + assertTrue(chore.isScheduled()); + assertTrue(service1.isChoreScheduled(chore)); + assertFalse(service2.isChoreScheduled(chore)); + assertFalse(chore.getChoreServicer() == null); + + service2.scheduleChore(chore); + Thread.sleep(sleepTime); + assertTrue(chore.isScheduled()); + assertFalse(service1.isChoreScheduled(chore)); + assertTrue(service2.isChoreScheduled(chore)); + assertFalse(chore.getChoreServicer() == null); + + chore.cancel(); + assertFalse(chore.isScheduled()); + assertFalse(service1.isChoreScheduled(chore)); + assertFalse(service2.isChoreScheduled(chore)); + assertTrue(chore.getChoreServicer() == null); + } + + @Test + public void testTriggerNowFailsWhenNotScheduled() throws InterruptedException { + final int period = 100; + // Small sleep time buffer to allow CountingChore to complete + final int sleep = 5; + ChoreService service = new ChoreService(TEST_SERVER_NAME); + CountingChore chore = new CountingChore("dn", period); + + assertFalse(chore.triggerNow()); + assertTrue(chore.getCountOfChoreCalls() == 0); + + service.scheduleChore(chore); + Thread.sleep(sleep); + assertEquals(1, chore.getCountOfChoreCalls()); + Thread.sleep(period); + assertEquals(2, chore.getCountOfChoreCalls()); + assertTrue(chore.triggerNow()); + Thread.sleep(sleep); + assertTrue(chore.triggerNow()); + Thread.sleep(sleep); + assertTrue(chore.triggerNow()); + Thread.sleep(sleep); + assertEquals(5, chore.getCountOfChoreCalls()); + } + + @Test + public void testStopperForScheduledChores() throws InterruptedException { + ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME); + Stoppable stopperForGroup1 = new SampleStopper(); + Stoppable stopperForGroup2 = new SampleStopper(); + final int period = 100; + final int delta = 10; + + ScheduledChore chore1_group1 = new DoNothingChore("c1g1", stopperForGroup1, period); + ScheduledChore chore2_group1 = new DoNothingChore("c2g1", stopperForGroup1, period); + ScheduledChore chore3_group1 = new DoNothingChore("c3g1", stopperForGroup1, period); + + ScheduledChore chore1_group2 = new DoNothingChore("c1g2", stopperForGroup2, period); + ScheduledChore chore2_group2 = new DoNothingChore("c2g2", stopperForGroup2, period); + ScheduledChore chore3_group2 = new DoNothingChore("c3g2", stopperForGroup2, period); + + service.scheduleChore(chore1_group1); + service.scheduleChore(chore2_group1); + service.scheduleChore(chore3_group1); + service.scheduleChore(chore1_group2); + service.scheduleChore(chore2_group2); + service.scheduleChore(chore3_group2); + + Thread.sleep(delta); + Thread.sleep(10 * period); + assertTrue(chore1_group1.isScheduled()); + assertTrue(chore2_group1.isScheduled()); + assertTrue(chore3_group1.isScheduled()); + assertTrue(chore1_group2.isScheduled()); + assertTrue(chore2_group2.isScheduled()); + assertTrue(chore3_group2.isScheduled()); + + stopperForGroup1.stop("test stopping group 1"); + Thread.sleep(period); + assertFalse(chore1_group1.isScheduled()); + assertFalse(chore2_group1.isScheduled()); + assertFalse(chore3_group1.isScheduled()); + assertTrue(chore1_group2.isScheduled()); + assertTrue(chore2_group2.isScheduled()); + assertTrue(chore3_group2.isScheduled()); + + stopperForGroup2.stop("test stopping group 2"); + Thread.sleep(period); + assertFalse(chore1_group1.isScheduled()); + assertFalse(chore2_group1.isScheduled()); + assertFalse(chore3_group1.isScheduled()); + assertFalse(chore1_group2.isScheduled()); + assertFalse(chore2_group2.isScheduled()); + assertFalse(chore3_group2.isScheduled()); + } + + @Test + public void testShutdownCancelsScheduledChores() throws InterruptedException { + final int period = 100; + ChoreService service = new ChoreService(TEST_SERVER_NAME); + ScheduledChore successChore1 = new DoNothingChore("sc1", period); + ScheduledChore successChore2 = new DoNothingChore("sc2", period); + ScheduledChore successChore3 = new DoNothingChore("sc3", period); + + assertTrue(service.scheduleChore(successChore1)); + assertTrue(successChore1.isScheduled()); + assertTrue(service.scheduleChore(successChore2)); + assertTrue(successChore2.isScheduled()); + assertTrue(service.scheduleChore(successChore3)); + assertTrue(successChore3.isScheduled()); + + service.shutdown(); + + assertFalse(successChore1.isScheduled()); + assertFalse(successChore2.isScheduled()); + assertFalse(successChore3.isScheduled()); + } + + @Test + public void testShutdownWorksWhileChoresAreExecuting() throws InterruptedException { + final int period = 100; + final int sleep = 5 * period; + ChoreService service = new ChoreService(TEST_SERVER_NAME); + ScheduledChore slowChore1 = new SleepingChore("sc1", period, sleep); + ScheduledChore slowChore2 = new SleepingChore("sc2", period, sleep); + ScheduledChore slowChore3 = new SleepingChore("sc3", period, sleep); + + assertTrue(service.scheduleChore(slowChore1)); + assertTrue(service.scheduleChore(slowChore2)); + assertTrue(service.scheduleChore(slowChore3)); + + Thread.sleep(sleep / 2); + service.shutdown(); + + assertFalse(slowChore1.isScheduled()); + assertFalse(slowChore2.isScheduled()); + assertFalse(slowChore3.isScheduled()); + assertTrue(service.isShutdown()); + + Thread.sleep(5); + assertTrue(service.isTerminated()); + } + + @Test + public void testShutdownRejectsNewSchedules() throws InterruptedException { + final int period = 100; + ChoreService service = new ChoreService(TEST_SERVER_NAME); + ScheduledChore successChore1 = new DoNothingChore("sc1", period); + ScheduledChore successChore2 = new DoNothingChore("sc2", period); + ScheduledChore successChore3 = new DoNothingChore("sc3", period); + ScheduledChore failChore1 = new DoNothingChore("fc1", period); + ScheduledChore failChore2 = new DoNothingChore("fc2", period); + ScheduledChore failChore3 = new DoNothingChore("fc3", period); + + assertTrue(service.scheduleChore(successChore1)); + assertTrue(successChore1.isScheduled()); + assertTrue(service.scheduleChore(successChore2)); + assertTrue(successChore2.isScheduled()); + assertTrue(service.scheduleChore(successChore3)); + assertTrue(successChore3.isScheduled()); + + service.shutdown(); + + assertFalse(service.scheduleChore(failChore1)); + assertFalse(failChore1.isScheduled()); + assertFalse(service.scheduleChore(failChore2)); + assertFalse(failChore2.isScheduled()); + assertFalse(service.scheduleChore(failChore3)); + assertFalse(failChore3.isScheduled()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java index ff42271..bb93bc8 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java @@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.rest; import java.io.IOException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.ParseFilter; @@ -74,7 +74,10 @@ public class RESTServlet implements Constants { } public synchronized static void stop() { - if (INSTANCE != null) INSTANCE = null; + if (INSTANCE != null) { + INSTANCE.shutdown(); + INSTANCE = null; + } } /** @@ -130,6 +133,13 @@ public class RESTServlet implements Constants { connectionCache.setEffectiveUser(effectiveUser); } + /** + * Shutdown any services that need to stop + */ + void shutdown() { + if (connectionCache != null) connectionCache.shutdown(); + } + boolean supportsProxyuser() { return conf.getBoolean(HBASE_REST_SUPPORT_PROXYUSER, false); } http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java index 4226c3f..fc70ca4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java @@ -28,7 +28,7 @@ import org.apache.hadoop.util.StringUtils; /** * The Class HealthCheckChore for running health checker regularly. */ - public class HealthCheckChore extends Chore { +public class HealthCheckChore extends ScheduledChore { private static Log LOG = LogFactory.getLog(HealthCheckChore.class); private HealthChecker healthChecker; private Configuration config; @@ -38,7 +38,7 @@ import org.apache.hadoop.util.StringUtils; private long startWindow; public HealthCheckChore(int sleepTime, Stoppable stopper, Configuration conf) { - super("HealthChecker", sleepTime, stopper); + super("HealthChecker", stopper, sleepTime); LOG.info("Health Check Chore runs every " + StringUtils.formatTime(sleepTime)); this.config = conf; String healthCheckScript = this.config.get(HConstants.HEALTH_SCRIPT_LOC); @@ -58,8 +58,8 @@ import org.apache.hadoop.util.StringUtils; if (!isHealthy) { boolean needToStop = decideToStop(); if (needToStop) { - this.stopper.stop("The node reported unhealthy " + threshold - + " number of times consecutively."); + getStopper().stop( + "The node reported unhealthy " + threshold + " number of times consecutively."); } // Always log health report. LOG.info("Health status at " + StringUtils.formatTime(System.currentTimeMillis()) + " : " http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java index 6b79f80..85f8471 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java @@ -65,4 +65,9 @@ public interface Server extends Abortable, Stoppable { * Get CoordinatedStateManager instance for this server. */ CoordinatedStateManager getCoordinatedStateManager(); + + /** + * @return The {@link ChoreService} instance for this server + */ + ChoreService getChoreService(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java index 9f71b90..9d18c98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java @@ -31,12 +31,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.HFileArchiver; @@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.util.Triple; * table on a period looking for unused regions to garbage collect. */ @InterfaceAudience.Private -public class CatalogJanitor extends Chore { +public class CatalogJanitor extends ScheduledChore { private static final Log LOG = LogFactory.getLog(CatalogJanitor.class.getName()); private final Server server; private final MasterServices services; @@ -66,9 +66,8 @@ public class CatalogJanitor extends Chore { private final Connection connection; CatalogJanitor(final Server server, final MasterServices services) { - super("CatalogJanitor-" + server.getServerName().toShortString(), - server.getConfiguration().getInt("hbase.catalogjanitor.interval", 300000), - server); + super("CatalogJanitor-" + server.getServerName().toShortString(), server, server + .getConfiguration().getInt("hbase.catalogjanitor.interval", 300000)); this.server = server; this.services = services; this.connection = server.getConnection(); http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java index 6e7024c..e90aae6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java @@ -35,22 +35,7 @@ import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.handler.codec.MessageToMessageEncoder; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import io.netty.util.internal.StringUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; -import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; -import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.ExceptionUtil; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.util.VersionInfo; import java.io.Closeable; import java.io.IOException; @@ -67,6 +52,22 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ExceptionUtil; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.VersionInfo; + /** * Class to publish the cluster status to the client. This allows them to know immediately @@ -75,7 +76,7 @@ import java.util.concurrent.ConcurrentMap; * on the client the different timeouts, as the dead servers will be detected separately. */ @InterfaceAudience.Private -public class ClusterStatusPublisher extends Chore { +public class ClusterStatusPublisher extends ScheduledChore { /** * The implementation class used to publish the status. Default is null (no publish). * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the @@ -115,8 +116,8 @@ public class ClusterStatusPublisher extends Chore { public ClusterStatusPublisher(HMaster master, Configuration conf, Class<? extends Publisher> publisherClass) throws IOException { - super("HBase clusterStatusPublisher for " + master.getName(), - conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD), master); + super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt( + STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD)); this.master = master; this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD); try {
