http://git-wip-us.apache.org/repos/asf/hbase-site/blob/b9b09fec/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.Testing.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.Testing.html b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.Testing.html index 9fd7e0b..fb34e18 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.Testing.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.Testing.html @@ -1923,188 +1923,193 @@ <span class="sourceLineNo">1915</span> return completed.size();<a name="line.1915"></a> <span class="sourceLineNo">1916</span> }<a name="line.1916"></a> <span class="sourceLineNo">1917</span><a name="line.1917"></a> -<span class="sourceLineNo">1918</span> // ==========================================================================<a name="line.1918"></a> -<span class="sourceLineNo">1919</span> // Worker Thread<a name="line.1919"></a> -<span class="sourceLineNo">1920</span> // ==========================================================================<a name="line.1920"></a> -<span class="sourceLineNo">1921</span> private class WorkerThread extends StoppableThread {<a name="line.1921"></a> -<span class="sourceLineNo">1922</span> private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);<a name="line.1922"></a> -<span class="sourceLineNo">1923</span> private volatile Procedure<TEnvironment> activeProcedure;<a name="line.1923"></a> -<span class="sourceLineNo">1924</span><a name="line.1924"></a> -<span class="sourceLineNo">1925</span> public WorkerThread(ThreadGroup group) {<a name="line.1925"></a> -<span class="sourceLineNo">1926</span> this(group, "PEWorker-");<a name="line.1926"></a> -<span class="sourceLineNo">1927</span> }<a name="line.1927"></a> -<span class="sourceLineNo">1928</span><a name="line.1928"></a> -<span class="sourceLineNo">1929</span> protected WorkerThread(ThreadGroup group, String prefix) {<a name="line.1929"></a> -<span class="sourceLineNo">1930</span> super(group, prefix + workerId.incrementAndGet());<a name="line.1930"></a> -<span class="sourceLineNo">1931</span> setDaemon(true);<a name="line.1931"></a> +<span class="sourceLineNo">1918</span> @VisibleForTesting<a name="line.1918"></a> +<span class="sourceLineNo">1919</span> public IdLock getProcExecutionLock() {<a name="line.1919"></a> +<span class="sourceLineNo">1920</span> return procExecutionLock;<a name="line.1920"></a> +<span class="sourceLineNo">1921</span> }<a name="line.1921"></a> +<span class="sourceLineNo">1922</span><a name="line.1922"></a> +<span class="sourceLineNo">1923</span> // ==========================================================================<a name="line.1923"></a> +<span class="sourceLineNo">1924</span> // Worker Thread<a name="line.1924"></a> +<span class="sourceLineNo">1925</span> // ==========================================================================<a name="line.1925"></a> +<span class="sourceLineNo">1926</span> private class WorkerThread extends StoppableThread {<a name="line.1926"></a> +<span class="sourceLineNo">1927</span> private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);<a name="line.1927"></a> +<span class="sourceLineNo">1928</span> private volatile Procedure<TEnvironment> activeProcedure;<a name="line.1928"></a> +<span class="sourceLineNo">1929</span><a name="line.1929"></a> +<span class="sourceLineNo">1930</span> public WorkerThread(ThreadGroup group) {<a name="line.1930"></a> +<span class="sourceLineNo">1931</span> this(group, "PEWorker-");<a name="line.1931"></a> <span class="sourceLineNo">1932</span> }<a name="line.1932"></a> <span class="sourceLineNo">1933</span><a name="line.1933"></a> -<span class="sourceLineNo">1934</span> @Override<a name="line.1934"></a> -<span class="sourceLineNo">1935</span> public void sendStopSignal() {<a name="line.1935"></a> -<span class="sourceLineNo">1936</span> scheduler.signalAll();<a name="line.1936"></a> +<span class="sourceLineNo">1934</span> protected WorkerThread(ThreadGroup group, String prefix) {<a name="line.1934"></a> +<span class="sourceLineNo">1935</span> super(group, prefix + workerId.incrementAndGet());<a name="line.1935"></a> +<span class="sourceLineNo">1936</span> setDaemon(true);<a name="line.1936"></a> <span class="sourceLineNo">1937</span> }<a name="line.1937"></a> -<span class="sourceLineNo">1938</span> @Override<a name="line.1938"></a> -<span class="sourceLineNo">1939</span> public void run() {<a name="line.1939"></a> -<span class="sourceLineNo">1940</span> long lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1940"></a> -<span class="sourceLineNo">1941</span> try {<a name="line.1941"></a> -<span class="sourceLineNo">1942</span> while (isRunning() && keepAlive(lastUpdate)) {<a name="line.1942"></a> -<span class="sourceLineNo">1943</span> @SuppressWarnings("unchecked")<a name="line.1943"></a> -<span class="sourceLineNo">1944</span> Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);<a name="line.1944"></a> -<span class="sourceLineNo">1945</span> if (proc == null) {<a name="line.1945"></a> -<span class="sourceLineNo">1946</span> continue;<a name="line.1946"></a> -<span class="sourceLineNo">1947</span> }<a name="line.1947"></a> -<span class="sourceLineNo">1948</span> this.activeProcedure = proc;<a name="line.1948"></a> -<span class="sourceLineNo">1949</span> int activeCount = activeExecutorCount.incrementAndGet();<a name="line.1949"></a> -<span class="sourceLineNo">1950</span> int runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1950"></a> -<span class="sourceLineNo">1951</span> LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1951"></a> -<span class="sourceLineNo">1952</span> runningCount, activeCount);<a name="line.1952"></a> -<span class="sourceLineNo">1953</span> executionStartTime.set(EnvironmentEdgeManager.currentTime());<a name="line.1953"></a> -<span class="sourceLineNo">1954</span> IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());<a name="line.1954"></a> -<span class="sourceLineNo">1955</span> try {<a name="line.1955"></a> -<span class="sourceLineNo">1956</span> executeProcedure(proc);<a name="line.1956"></a> -<span class="sourceLineNo">1957</span> } catch (AssertionError e) {<a name="line.1957"></a> -<span class="sourceLineNo">1958</span> LOG.info("ASSERT pid=" + proc.getProcId(), e);<a name="line.1958"></a> -<span class="sourceLineNo">1959</span> throw e;<a name="line.1959"></a> -<span class="sourceLineNo">1960</span> } finally {<a name="line.1960"></a> -<span class="sourceLineNo">1961</span> procExecutionLock.releaseLockEntry(lockEntry);<a name="line.1961"></a> -<span class="sourceLineNo">1962</span> activeCount = activeExecutorCount.decrementAndGet();<a name="line.1962"></a> -<span class="sourceLineNo">1963</span> runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1963"></a> -<span class="sourceLineNo">1964</span> LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1964"></a> -<span class="sourceLineNo">1965</span> runningCount, activeCount);<a name="line.1965"></a> -<span class="sourceLineNo">1966</span> this.activeProcedure = null;<a name="line.1966"></a> -<span class="sourceLineNo">1967</span> lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1967"></a> -<span class="sourceLineNo">1968</span> executionStartTime.set(Long.MAX_VALUE);<a name="line.1968"></a> -<span class="sourceLineNo">1969</span> }<a name="line.1969"></a> -<span class="sourceLineNo">1970</span> }<a name="line.1970"></a> -<span class="sourceLineNo">1971</span> } catch (Throwable t) {<a name="line.1971"></a> -<span class="sourceLineNo">1972</span> LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);<a name="line.1972"></a> -<span class="sourceLineNo">1973</span> } finally {<a name="line.1973"></a> -<span class="sourceLineNo">1974</span> LOG.trace("Worker terminated.");<a name="line.1974"></a> -<span class="sourceLineNo">1975</span> }<a name="line.1975"></a> -<span class="sourceLineNo">1976</span> workerThreads.remove(this);<a name="line.1976"></a> -<span class="sourceLineNo">1977</span> }<a name="line.1977"></a> -<span class="sourceLineNo">1978</span><a name="line.1978"></a> -<span class="sourceLineNo">1979</span> @Override<a name="line.1979"></a> -<span class="sourceLineNo">1980</span> public String toString() {<a name="line.1980"></a> -<span class="sourceLineNo">1981</span> Procedure<?> p = this.activeProcedure;<a name="line.1981"></a> -<span class="sourceLineNo">1982</span> return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");<a name="line.1982"></a> -<span class="sourceLineNo">1983</span> }<a name="line.1983"></a> -<span class="sourceLineNo">1984</span><a name="line.1984"></a> -<span class="sourceLineNo">1985</span> /**<a name="line.1985"></a> -<span class="sourceLineNo">1986</span> * @return the time since the current procedure is running<a name="line.1986"></a> -<span class="sourceLineNo">1987</span> */<a name="line.1987"></a> -<span class="sourceLineNo">1988</span> public long getCurrentRunTime() {<a name="line.1988"></a> -<span class="sourceLineNo">1989</span> return EnvironmentEdgeManager.currentTime() - executionStartTime.get();<a name="line.1989"></a> -<span class="sourceLineNo">1990</span> }<a name="line.1990"></a> -<span class="sourceLineNo">1991</span><a name="line.1991"></a> -<span class="sourceLineNo">1992</span> // core worker never timeout<a name="line.1992"></a> -<span class="sourceLineNo">1993</span> protected boolean keepAlive(long lastUpdate) {<a name="line.1993"></a> -<span class="sourceLineNo">1994</span> return true;<a name="line.1994"></a> +<span class="sourceLineNo">1938</span><a name="line.1938"></a> +<span class="sourceLineNo">1939</span> @Override<a name="line.1939"></a> +<span class="sourceLineNo">1940</span> public void sendStopSignal() {<a name="line.1940"></a> +<span class="sourceLineNo">1941</span> scheduler.signalAll();<a name="line.1941"></a> +<span class="sourceLineNo">1942</span> }<a name="line.1942"></a> +<span class="sourceLineNo">1943</span> @Override<a name="line.1943"></a> +<span class="sourceLineNo">1944</span> public void run() {<a name="line.1944"></a> +<span class="sourceLineNo">1945</span> long lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1945"></a> +<span class="sourceLineNo">1946</span> try {<a name="line.1946"></a> +<span class="sourceLineNo">1947</span> while (isRunning() && keepAlive(lastUpdate)) {<a name="line.1947"></a> +<span class="sourceLineNo">1948</span> @SuppressWarnings("unchecked")<a name="line.1948"></a> +<span class="sourceLineNo">1949</span> Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);<a name="line.1949"></a> +<span class="sourceLineNo">1950</span> if (proc == null) {<a name="line.1950"></a> +<span class="sourceLineNo">1951</span> continue;<a name="line.1951"></a> +<span class="sourceLineNo">1952</span> }<a name="line.1952"></a> +<span class="sourceLineNo">1953</span> this.activeProcedure = proc;<a name="line.1953"></a> +<span class="sourceLineNo">1954</span> int activeCount = activeExecutorCount.incrementAndGet();<a name="line.1954"></a> +<span class="sourceLineNo">1955</span> int runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1955"></a> +<span class="sourceLineNo">1956</span> LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1956"></a> +<span class="sourceLineNo">1957</span> runningCount, activeCount);<a name="line.1957"></a> +<span class="sourceLineNo">1958</span> executionStartTime.set(EnvironmentEdgeManager.currentTime());<a name="line.1958"></a> +<span class="sourceLineNo">1959</span> IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());<a name="line.1959"></a> +<span class="sourceLineNo">1960</span> try {<a name="line.1960"></a> +<span class="sourceLineNo">1961</span> executeProcedure(proc);<a name="line.1961"></a> +<span class="sourceLineNo">1962</span> } catch (AssertionError e) {<a name="line.1962"></a> +<span class="sourceLineNo">1963</span> LOG.info("ASSERT pid=" + proc.getProcId(), e);<a name="line.1963"></a> +<span class="sourceLineNo">1964</span> throw e;<a name="line.1964"></a> +<span class="sourceLineNo">1965</span> } finally {<a name="line.1965"></a> +<span class="sourceLineNo">1966</span> procExecutionLock.releaseLockEntry(lockEntry);<a name="line.1966"></a> +<span class="sourceLineNo">1967</span> activeCount = activeExecutorCount.decrementAndGet();<a name="line.1967"></a> +<span class="sourceLineNo">1968</span> runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1968"></a> +<span class="sourceLineNo">1969</span> LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1969"></a> +<span class="sourceLineNo">1970</span> runningCount, activeCount);<a name="line.1970"></a> +<span class="sourceLineNo">1971</span> this.activeProcedure = null;<a name="line.1971"></a> +<span class="sourceLineNo">1972</span> lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1972"></a> +<span class="sourceLineNo">1973</span> executionStartTime.set(Long.MAX_VALUE);<a name="line.1973"></a> +<span class="sourceLineNo">1974</span> }<a name="line.1974"></a> +<span class="sourceLineNo">1975</span> }<a name="line.1975"></a> +<span class="sourceLineNo">1976</span> } catch (Throwable t) {<a name="line.1976"></a> +<span class="sourceLineNo">1977</span> LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);<a name="line.1977"></a> +<span class="sourceLineNo">1978</span> } finally {<a name="line.1978"></a> +<span class="sourceLineNo">1979</span> LOG.trace("Worker terminated.");<a name="line.1979"></a> +<span class="sourceLineNo">1980</span> }<a name="line.1980"></a> +<span class="sourceLineNo">1981</span> workerThreads.remove(this);<a name="line.1981"></a> +<span class="sourceLineNo">1982</span> }<a name="line.1982"></a> +<span class="sourceLineNo">1983</span><a name="line.1983"></a> +<span class="sourceLineNo">1984</span> @Override<a name="line.1984"></a> +<span class="sourceLineNo">1985</span> public String toString() {<a name="line.1985"></a> +<span class="sourceLineNo">1986</span> Procedure<?> p = this.activeProcedure;<a name="line.1986"></a> +<span class="sourceLineNo">1987</span> return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");<a name="line.1987"></a> +<span class="sourceLineNo">1988</span> }<a name="line.1988"></a> +<span class="sourceLineNo">1989</span><a name="line.1989"></a> +<span class="sourceLineNo">1990</span> /**<a name="line.1990"></a> +<span class="sourceLineNo">1991</span> * @return the time since the current procedure is running<a name="line.1991"></a> +<span class="sourceLineNo">1992</span> */<a name="line.1992"></a> +<span class="sourceLineNo">1993</span> public long getCurrentRunTime() {<a name="line.1993"></a> +<span class="sourceLineNo">1994</span> return EnvironmentEdgeManager.currentTime() - executionStartTime.get();<a name="line.1994"></a> <span class="sourceLineNo">1995</span> }<a name="line.1995"></a> -<span class="sourceLineNo">1996</span> }<a name="line.1996"></a> -<span class="sourceLineNo">1997</span><a name="line.1997"></a> -<span class="sourceLineNo">1998</span> // A worker thread which can be added when core workers are stuck. Will timeout after<a name="line.1998"></a> -<span class="sourceLineNo">1999</span> // keepAliveTime if there is no procedure to run.<a name="line.1999"></a> -<span class="sourceLineNo">2000</span> private final class KeepAliveWorkerThread extends WorkerThread {<a name="line.2000"></a> -<span class="sourceLineNo">2001</span><a name="line.2001"></a> -<span class="sourceLineNo">2002</span> public KeepAliveWorkerThread(ThreadGroup group) {<a name="line.2002"></a> -<span class="sourceLineNo">2003</span> super(group, "KeepAlivePEWorker-");<a name="line.2003"></a> -<span class="sourceLineNo">2004</span> }<a name="line.2004"></a> -<span class="sourceLineNo">2005</span><a name="line.2005"></a> -<span class="sourceLineNo">2006</span> @Override<a name="line.2006"></a> -<span class="sourceLineNo">2007</span> protected boolean keepAlive(long lastUpdate) {<a name="line.2007"></a> -<span class="sourceLineNo">2008</span> return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;<a name="line.2008"></a> +<span class="sourceLineNo">1996</span><a name="line.1996"></a> +<span class="sourceLineNo">1997</span> // core worker never timeout<a name="line.1997"></a> +<span class="sourceLineNo">1998</span> protected boolean keepAlive(long lastUpdate) {<a name="line.1998"></a> +<span class="sourceLineNo">1999</span> return true;<a name="line.1999"></a> +<span class="sourceLineNo">2000</span> }<a name="line.2000"></a> +<span class="sourceLineNo">2001</span> }<a name="line.2001"></a> +<span class="sourceLineNo">2002</span><a name="line.2002"></a> +<span class="sourceLineNo">2003</span> // A worker thread which can be added when core workers are stuck. Will timeout after<a name="line.2003"></a> +<span class="sourceLineNo">2004</span> // keepAliveTime if there is no procedure to run.<a name="line.2004"></a> +<span class="sourceLineNo">2005</span> private final class KeepAliveWorkerThread extends WorkerThread {<a name="line.2005"></a> +<span class="sourceLineNo">2006</span><a name="line.2006"></a> +<span class="sourceLineNo">2007</span> public KeepAliveWorkerThread(ThreadGroup group) {<a name="line.2007"></a> +<span class="sourceLineNo">2008</span> super(group, "KeepAlivePEWorker-");<a name="line.2008"></a> <span class="sourceLineNo">2009</span> }<a name="line.2009"></a> -<span class="sourceLineNo">2010</span> }<a name="line.2010"></a> -<span class="sourceLineNo">2011</span><a name="line.2011"></a> -<span class="sourceLineNo">2012</span> // ----------------------------------------------------------------------------<a name="line.2012"></a> -<span class="sourceLineNo">2013</span> // TODO-MAYBE: Should we provide a InlineChore to notify the store with the<a name="line.2013"></a> -<span class="sourceLineNo">2014</span> // full set of procedures pending and completed to write a compacted<a name="line.2014"></a> -<span class="sourceLineNo">2015</span> // version of the log (in case is a log)?<a name="line.2015"></a> -<span class="sourceLineNo">2016</span> // In theory no, procedures are have a short life, so at some point the store<a name="line.2016"></a> -<span class="sourceLineNo">2017</span> // will have the tracker saying everything is in the last log.<a name="line.2017"></a> -<span class="sourceLineNo">2018</span> // ----------------------------------------------------------------------------<a name="line.2018"></a> -<span class="sourceLineNo">2019</span><a name="line.2019"></a> -<span class="sourceLineNo">2020</span> private final class WorkerMonitor extends InlineChore {<a name="line.2020"></a> -<span class="sourceLineNo">2021</span> public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =<a name="line.2021"></a> -<span class="sourceLineNo">2022</span> "hbase.procedure.worker.monitor.interval.msec";<a name="line.2022"></a> -<span class="sourceLineNo">2023</span> private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec<a name="line.2023"></a> +<span class="sourceLineNo">2010</span><a name="line.2010"></a> +<span class="sourceLineNo">2011</span> @Override<a name="line.2011"></a> +<span class="sourceLineNo">2012</span> protected boolean keepAlive(long lastUpdate) {<a name="line.2012"></a> +<span class="sourceLineNo">2013</span> return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;<a name="line.2013"></a> +<span class="sourceLineNo">2014</span> }<a name="line.2014"></a> +<span class="sourceLineNo">2015</span> }<a name="line.2015"></a> +<span class="sourceLineNo">2016</span><a name="line.2016"></a> +<span class="sourceLineNo">2017</span> // ----------------------------------------------------------------------------<a name="line.2017"></a> +<span class="sourceLineNo">2018</span> // TODO-MAYBE: Should we provide a InlineChore to notify the store with the<a name="line.2018"></a> +<span class="sourceLineNo">2019</span> // full set of procedures pending and completed to write a compacted<a name="line.2019"></a> +<span class="sourceLineNo">2020</span> // version of the log (in case is a log)?<a name="line.2020"></a> +<span class="sourceLineNo">2021</span> // In theory no, procedures are have a short life, so at some point the store<a name="line.2021"></a> +<span class="sourceLineNo">2022</span> // will have the tracker saying everything is in the last log.<a name="line.2022"></a> +<span class="sourceLineNo">2023</span> // ----------------------------------------------------------------------------<a name="line.2023"></a> <span class="sourceLineNo">2024</span><a name="line.2024"></a> -<span class="sourceLineNo">2025</span> public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =<a name="line.2025"></a> -<span class="sourceLineNo">2026</span> "hbase.procedure.worker.stuck.threshold.msec";<a name="line.2026"></a> -<span class="sourceLineNo">2027</span> private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec<a name="line.2027"></a> -<span class="sourceLineNo">2028</span><a name="line.2028"></a> -<span class="sourceLineNo">2029</span> public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =<a name="line.2029"></a> -<span class="sourceLineNo">2030</span> "hbase.procedure.worker.add.stuck.percentage";<a name="line.2030"></a> -<span class="sourceLineNo">2031</span> private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck<a name="line.2031"></a> -<span class="sourceLineNo">2032</span><a name="line.2032"></a> -<span class="sourceLineNo">2033</span> private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;<a name="line.2033"></a> -<span class="sourceLineNo">2034</span> private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;<a name="line.2034"></a> -<span class="sourceLineNo">2035</span> private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;<a name="line.2035"></a> -<span class="sourceLineNo">2036</span><a name="line.2036"></a> -<span class="sourceLineNo">2037</span> public WorkerMonitor() {<a name="line.2037"></a> -<span class="sourceLineNo">2038</span> refreshConfig();<a name="line.2038"></a> -<span class="sourceLineNo">2039</span> }<a name="line.2039"></a> -<span class="sourceLineNo">2040</span><a name="line.2040"></a> -<span class="sourceLineNo">2041</span> @Override<a name="line.2041"></a> -<span class="sourceLineNo">2042</span> public void run() {<a name="line.2042"></a> -<span class="sourceLineNo">2043</span> final int stuckCount = checkForStuckWorkers();<a name="line.2043"></a> -<span class="sourceLineNo">2044</span> checkThreadCount(stuckCount);<a name="line.2044"></a> +<span class="sourceLineNo">2025</span> private final class WorkerMonitor extends InlineChore {<a name="line.2025"></a> +<span class="sourceLineNo">2026</span> public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =<a name="line.2026"></a> +<span class="sourceLineNo">2027</span> "hbase.procedure.worker.monitor.interval.msec";<a name="line.2027"></a> +<span class="sourceLineNo">2028</span> private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec<a name="line.2028"></a> +<span class="sourceLineNo">2029</span><a name="line.2029"></a> +<span class="sourceLineNo">2030</span> public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =<a name="line.2030"></a> +<span class="sourceLineNo">2031</span> "hbase.procedure.worker.stuck.threshold.msec";<a name="line.2031"></a> +<span class="sourceLineNo">2032</span> private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec<a name="line.2032"></a> +<span class="sourceLineNo">2033</span><a name="line.2033"></a> +<span class="sourceLineNo">2034</span> public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =<a name="line.2034"></a> +<span class="sourceLineNo">2035</span> "hbase.procedure.worker.add.stuck.percentage";<a name="line.2035"></a> +<span class="sourceLineNo">2036</span> private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck<a name="line.2036"></a> +<span class="sourceLineNo">2037</span><a name="line.2037"></a> +<span class="sourceLineNo">2038</span> private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;<a name="line.2038"></a> +<span class="sourceLineNo">2039</span> private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;<a name="line.2039"></a> +<span class="sourceLineNo">2040</span> private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;<a name="line.2040"></a> +<span class="sourceLineNo">2041</span><a name="line.2041"></a> +<span class="sourceLineNo">2042</span> public WorkerMonitor() {<a name="line.2042"></a> +<span class="sourceLineNo">2043</span> refreshConfig();<a name="line.2043"></a> +<span class="sourceLineNo">2044</span> }<a name="line.2044"></a> <span class="sourceLineNo">2045</span><a name="line.2045"></a> -<span class="sourceLineNo">2046</span> // refresh interval (poor man dynamic conf update)<a name="line.2046"></a> -<span class="sourceLineNo">2047</span> refreshConfig();<a name="line.2047"></a> -<span class="sourceLineNo">2048</span> }<a name="line.2048"></a> -<span class="sourceLineNo">2049</span><a name="line.2049"></a> -<span class="sourceLineNo">2050</span> private int checkForStuckWorkers() {<a name="line.2050"></a> -<span class="sourceLineNo">2051</span> // check if any of the worker is stuck<a name="line.2051"></a> -<span class="sourceLineNo">2052</span> int stuckCount = 0;<a name="line.2052"></a> -<span class="sourceLineNo">2053</span> for (WorkerThread worker : workerThreads) {<a name="line.2053"></a> -<span class="sourceLineNo">2054</span> if (worker.getCurrentRunTime() < stuckThreshold) {<a name="line.2054"></a> -<span class="sourceLineNo">2055</span> continue;<a name="line.2055"></a> -<span class="sourceLineNo">2056</span> }<a name="line.2056"></a> -<span class="sourceLineNo">2057</span><a name="line.2057"></a> -<span class="sourceLineNo">2058</span> // WARN the worker is stuck<a name="line.2058"></a> -<span class="sourceLineNo">2059</span> stuckCount++;<a name="line.2059"></a> -<span class="sourceLineNo">2060</span> LOG.warn("Worker stuck {}, run time {}", worker,<a name="line.2060"></a> -<span class="sourceLineNo">2061</span> StringUtils.humanTimeDiff(worker.getCurrentRunTime()));<a name="line.2061"></a> -<span class="sourceLineNo">2062</span> }<a name="line.2062"></a> -<span class="sourceLineNo">2063</span> return stuckCount;<a name="line.2063"></a> -<span class="sourceLineNo">2064</span> }<a name="line.2064"></a> -<span class="sourceLineNo">2065</span><a name="line.2065"></a> -<span class="sourceLineNo">2066</span> private void checkThreadCount(final int stuckCount) {<a name="line.2066"></a> -<span class="sourceLineNo">2067</span> // nothing to do if there are no runnable tasks<a name="line.2067"></a> -<span class="sourceLineNo">2068</span> if (stuckCount < 1 || !scheduler.hasRunnables()) {<a name="line.2068"></a> -<span class="sourceLineNo">2069</span> return;<a name="line.2069"></a> -<span class="sourceLineNo">2070</span> }<a name="line.2070"></a> -<span class="sourceLineNo">2071</span><a name="line.2071"></a> -<span class="sourceLineNo">2072</span> // add a new thread if the worker stuck percentage exceed the threshold limit<a name="line.2072"></a> -<span class="sourceLineNo">2073</span> // and every handler is active.<a name="line.2073"></a> -<span class="sourceLineNo">2074</span> final float stuckPerc = ((float) stuckCount) / workerThreads.size();<a name="line.2074"></a> -<span class="sourceLineNo">2075</span> // let's add new worker thread more aggressively, as they will timeout finally if there is no<a name="line.2075"></a> -<span class="sourceLineNo">2076</span> // work to do.<a name="line.2076"></a> -<span class="sourceLineNo">2077</span> if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {<a name="line.2077"></a> -<span class="sourceLineNo">2078</span> final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);<a name="line.2078"></a> -<span class="sourceLineNo">2079</span> workerThreads.add(worker);<a name="line.2079"></a> -<span class="sourceLineNo">2080</span> worker.start();<a name="line.2080"></a> -<span class="sourceLineNo">2081</span> LOG.debug("Added new worker thread {}", worker);<a name="line.2081"></a> -<span class="sourceLineNo">2082</span> }<a name="line.2082"></a> -<span class="sourceLineNo">2083</span> }<a name="line.2083"></a> -<span class="sourceLineNo">2084</span><a name="line.2084"></a> -<span class="sourceLineNo">2085</span> private void refreshConfig() {<a name="line.2085"></a> -<span class="sourceLineNo">2086</span> addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,<a name="line.2086"></a> -<span class="sourceLineNo">2087</span> DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);<a name="line.2087"></a> -<span class="sourceLineNo">2088</span> timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,<a name="line.2088"></a> -<span class="sourceLineNo">2089</span> DEFAULT_WORKER_MONITOR_INTERVAL);<a name="line.2089"></a> -<span class="sourceLineNo">2090</span> stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,<a name="line.2090"></a> -<span class="sourceLineNo">2091</span> DEFAULT_WORKER_STUCK_THRESHOLD);<a name="line.2091"></a> -<span class="sourceLineNo">2092</span> }<a name="line.2092"></a> -<span class="sourceLineNo">2093</span><a name="line.2093"></a> -<span class="sourceLineNo">2094</span> @Override<a name="line.2094"></a> -<span class="sourceLineNo">2095</span> public int getTimeoutInterval() {<a name="line.2095"></a> -<span class="sourceLineNo">2096</span> return timeoutInterval;<a name="line.2096"></a> +<span class="sourceLineNo">2046</span> @Override<a name="line.2046"></a> +<span class="sourceLineNo">2047</span> public void run() {<a name="line.2047"></a> +<span class="sourceLineNo">2048</span> final int stuckCount = checkForStuckWorkers();<a name="line.2048"></a> +<span class="sourceLineNo">2049</span> checkThreadCount(stuckCount);<a name="line.2049"></a> +<span class="sourceLineNo">2050</span><a name="line.2050"></a> +<span class="sourceLineNo">2051</span> // refresh interval (poor man dynamic conf update)<a name="line.2051"></a> +<span class="sourceLineNo">2052</span> refreshConfig();<a name="line.2052"></a> +<span class="sourceLineNo">2053</span> }<a name="line.2053"></a> +<span class="sourceLineNo">2054</span><a name="line.2054"></a> +<span class="sourceLineNo">2055</span> private int checkForStuckWorkers() {<a name="line.2055"></a> +<span class="sourceLineNo">2056</span> // check if any of the worker is stuck<a name="line.2056"></a> +<span class="sourceLineNo">2057</span> int stuckCount = 0;<a name="line.2057"></a> +<span class="sourceLineNo">2058</span> for (WorkerThread worker : workerThreads) {<a name="line.2058"></a> +<span class="sourceLineNo">2059</span> if (worker.getCurrentRunTime() < stuckThreshold) {<a name="line.2059"></a> +<span class="sourceLineNo">2060</span> continue;<a name="line.2060"></a> +<span class="sourceLineNo">2061</span> }<a name="line.2061"></a> +<span class="sourceLineNo">2062</span><a name="line.2062"></a> +<span class="sourceLineNo">2063</span> // WARN the worker is stuck<a name="line.2063"></a> +<span class="sourceLineNo">2064</span> stuckCount++;<a name="line.2064"></a> +<span class="sourceLineNo">2065</span> LOG.warn("Worker stuck {}, run time {}", worker,<a name="line.2065"></a> +<span class="sourceLineNo">2066</span> StringUtils.humanTimeDiff(worker.getCurrentRunTime()));<a name="line.2066"></a> +<span class="sourceLineNo">2067</span> }<a name="line.2067"></a> +<span class="sourceLineNo">2068</span> return stuckCount;<a name="line.2068"></a> +<span class="sourceLineNo">2069</span> }<a name="line.2069"></a> +<span class="sourceLineNo">2070</span><a name="line.2070"></a> +<span class="sourceLineNo">2071</span> private void checkThreadCount(final int stuckCount) {<a name="line.2071"></a> +<span class="sourceLineNo">2072</span> // nothing to do if there are no runnable tasks<a name="line.2072"></a> +<span class="sourceLineNo">2073</span> if (stuckCount < 1 || !scheduler.hasRunnables()) {<a name="line.2073"></a> +<span class="sourceLineNo">2074</span> return;<a name="line.2074"></a> +<span class="sourceLineNo">2075</span> }<a name="line.2075"></a> +<span class="sourceLineNo">2076</span><a name="line.2076"></a> +<span class="sourceLineNo">2077</span> // add a new thread if the worker stuck percentage exceed the threshold limit<a name="line.2077"></a> +<span class="sourceLineNo">2078</span> // and every handler is active.<a name="line.2078"></a> +<span class="sourceLineNo">2079</span> final float stuckPerc = ((float) stuckCount) / workerThreads.size();<a name="line.2079"></a> +<span class="sourceLineNo">2080</span> // let's add new worker thread more aggressively, as they will timeout finally if there is no<a name="line.2080"></a> +<span class="sourceLineNo">2081</span> // work to do.<a name="line.2081"></a> +<span class="sourceLineNo">2082</span> if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {<a name="line.2082"></a> +<span class="sourceLineNo">2083</span> final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);<a name="line.2083"></a> +<span class="sourceLineNo">2084</span> workerThreads.add(worker);<a name="line.2084"></a> +<span class="sourceLineNo">2085</span> worker.start();<a name="line.2085"></a> +<span class="sourceLineNo">2086</span> LOG.debug("Added new worker thread {}", worker);<a name="line.2086"></a> +<span class="sourceLineNo">2087</span> }<a name="line.2087"></a> +<span class="sourceLineNo">2088</span> }<a name="line.2088"></a> +<span class="sourceLineNo">2089</span><a name="line.2089"></a> +<span class="sourceLineNo">2090</span> private void refreshConfig() {<a name="line.2090"></a> +<span class="sourceLineNo">2091</span> addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,<a name="line.2091"></a> +<span class="sourceLineNo">2092</span> DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);<a name="line.2092"></a> +<span class="sourceLineNo">2093</span> timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,<a name="line.2093"></a> +<span class="sourceLineNo">2094</span> DEFAULT_WORKER_MONITOR_INTERVAL);<a name="line.2094"></a> +<span class="sourceLineNo">2095</span> stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,<a name="line.2095"></a> +<span class="sourceLineNo">2096</span> DEFAULT_WORKER_STUCK_THRESHOLD);<a name="line.2096"></a> <span class="sourceLineNo">2097</span> }<a name="line.2097"></a> -<span class="sourceLineNo">2098</span> }<a name="line.2098"></a> -<span class="sourceLineNo">2099</span>}<a name="line.2099"></a> +<span class="sourceLineNo">2098</span><a name="line.2098"></a> +<span class="sourceLineNo">2099</span> @Override<a name="line.2099"></a> +<span class="sourceLineNo">2100</span> public int getTimeoutInterval() {<a name="line.2100"></a> +<span class="sourceLineNo">2101</span> return timeoutInterval;<a name="line.2101"></a> +<span class="sourceLineNo">2102</span> }<a name="line.2102"></a> +<span class="sourceLineNo">2103</span> }<a name="line.2103"></a> +<span class="sourceLineNo">2104</span>}<a name="line.2104"></a>
http://git-wip-us.apache.org/repos/asf/hbase-site/blob/b9b09fec/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.WorkerMonitor.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.WorkerMonitor.html b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.WorkerMonitor.html index 9fd7e0b..fb34e18 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.WorkerMonitor.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.WorkerMonitor.html @@ -1923,188 +1923,193 @@ <span class="sourceLineNo">1915</span> return completed.size();<a name="line.1915"></a> <span class="sourceLineNo">1916</span> }<a name="line.1916"></a> <span class="sourceLineNo">1917</span><a name="line.1917"></a> -<span class="sourceLineNo">1918</span> // ==========================================================================<a name="line.1918"></a> -<span class="sourceLineNo">1919</span> // Worker Thread<a name="line.1919"></a> -<span class="sourceLineNo">1920</span> // ==========================================================================<a name="line.1920"></a> -<span class="sourceLineNo">1921</span> private class WorkerThread extends StoppableThread {<a name="line.1921"></a> -<span class="sourceLineNo">1922</span> private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);<a name="line.1922"></a> -<span class="sourceLineNo">1923</span> private volatile Procedure<TEnvironment> activeProcedure;<a name="line.1923"></a> -<span class="sourceLineNo">1924</span><a name="line.1924"></a> -<span class="sourceLineNo">1925</span> public WorkerThread(ThreadGroup group) {<a name="line.1925"></a> -<span class="sourceLineNo">1926</span> this(group, "PEWorker-");<a name="line.1926"></a> -<span class="sourceLineNo">1927</span> }<a name="line.1927"></a> -<span class="sourceLineNo">1928</span><a name="line.1928"></a> -<span class="sourceLineNo">1929</span> protected WorkerThread(ThreadGroup group, String prefix) {<a name="line.1929"></a> -<span class="sourceLineNo">1930</span> super(group, prefix + workerId.incrementAndGet());<a name="line.1930"></a> -<span class="sourceLineNo">1931</span> setDaemon(true);<a name="line.1931"></a> +<span class="sourceLineNo">1918</span> @VisibleForTesting<a name="line.1918"></a> +<span class="sourceLineNo">1919</span> public IdLock getProcExecutionLock() {<a name="line.1919"></a> +<span class="sourceLineNo">1920</span> return procExecutionLock;<a name="line.1920"></a> +<span class="sourceLineNo">1921</span> }<a name="line.1921"></a> +<span class="sourceLineNo">1922</span><a name="line.1922"></a> +<span class="sourceLineNo">1923</span> // ==========================================================================<a name="line.1923"></a> +<span class="sourceLineNo">1924</span> // Worker Thread<a name="line.1924"></a> +<span class="sourceLineNo">1925</span> // ==========================================================================<a name="line.1925"></a> +<span class="sourceLineNo">1926</span> private class WorkerThread extends StoppableThread {<a name="line.1926"></a> +<span class="sourceLineNo">1927</span> private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);<a name="line.1927"></a> +<span class="sourceLineNo">1928</span> private volatile Procedure<TEnvironment> activeProcedure;<a name="line.1928"></a> +<span class="sourceLineNo">1929</span><a name="line.1929"></a> +<span class="sourceLineNo">1930</span> public WorkerThread(ThreadGroup group) {<a name="line.1930"></a> +<span class="sourceLineNo">1931</span> this(group, "PEWorker-");<a name="line.1931"></a> <span class="sourceLineNo">1932</span> }<a name="line.1932"></a> <span class="sourceLineNo">1933</span><a name="line.1933"></a> -<span class="sourceLineNo">1934</span> @Override<a name="line.1934"></a> -<span class="sourceLineNo">1935</span> public void sendStopSignal() {<a name="line.1935"></a> -<span class="sourceLineNo">1936</span> scheduler.signalAll();<a name="line.1936"></a> +<span class="sourceLineNo">1934</span> protected WorkerThread(ThreadGroup group, String prefix) {<a name="line.1934"></a> +<span class="sourceLineNo">1935</span> super(group, prefix + workerId.incrementAndGet());<a name="line.1935"></a> +<span class="sourceLineNo">1936</span> setDaemon(true);<a name="line.1936"></a> <span class="sourceLineNo">1937</span> }<a name="line.1937"></a> -<span class="sourceLineNo">1938</span> @Override<a name="line.1938"></a> -<span class="sourceLineNo">1939</span> public void run() {<a name="line.1939"></a> -<span class="sourceLineNo">1940</span> long lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1940"></a> -<span class="sourceLineNo">1941</span> try {<a name="line.1941"></a> -<span class="sourceLineNo">1942</span> while (isRunning() && keepAlive(lastUpdate)) {<a name="line.1942"></a> -<span class="sourceLineNo">1943</span> @SuppressWarnings("unchecked")<a name="line.1943"></a> -<span class="sourceLineNo">1944</span> Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);<a name="line.1944"></a> -<span class="sourceLineNo">1945</span> if (proc == null) {<a name="line.1945"></a> -<span class="sourceLineNo">1946</span> continue;<a name="line.1946"></a> -<span class="sourceLineNo">1947</span> }<a name="line.1947"></a> -<span class="sourceLineNo">1948</span> this.activeProcedure = proc;<a name="line.1948"></a> -<span class="sourceLineNo">1949</span> int activeCount = activeExecutorCount.incrementAndGet();<a name="line.1949"></a> -<span class="sourceLineNo">1950</span> int runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1950"></a> -<span class="sourceLineNo">1951</span> LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1951"></a> -<span class="sourceLineNo">1952</span> runningCount, activeCount);<a name="line.1952"></a> -<span class="sourceLineNo">1953</span> executionStartTime.set(EnvironmentEdgeManager.currentTime());<a name="line.1953"></a> -<span class="sourceLineNo">1954</span> IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());<a name="line.1954"></a> -<span class="sourceLineNo">1955</span> try {<a name="line.1955"></a> -<span class="sourceLineNo">1956</span> executeProcedure(proc);<a name="line.1956"></a> -<span class="sourceLineNo">1957</span> } catch (AssertionError e) {<a name="line.1957"></a> -<span class="sourceLineNo">1958</span> LOG.info("ASSERT pid=" + proc.getProcId(), e);<a name="line.1958"></a> -<span class="sourceLineNo">1959</span> throw e;<a name="line.1959"></a> -<span class="sourceLineNo">1960</span> } finally {<a name="line.1960"></a> -<span class="sourceLineNo">1961</span> procExecutionLock.releaseLockEntry(lockEntry);<a name="line.1961"></a> -<span class="sourceLineNo">1962</span> activeCount = activeExecutorCount.decrementAndGet();<a name="line.1962"></a> -<span class="sourceLineNo">1963</span> runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1963"></a> -<span class="sourceLineNo">1964</span> LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1964"></a> -<span class="sourceLineNo">1965</span> runningCount, activeCount);<a name="line.1965"></a> -<span class="sourceLineNo">1966</span> this.activeProcedure = null;<a name="line.1966"></a> -<span class="sourceLineNo">1967</span> lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1967"></a> -<span class="sourceLineNo">1968</span> executionStartTime.set(Long.MAX_VALUE);<a name="line.1968"></a> -<span class="sourceLineNo">1969</span> }<a name="line.1969"></a> -<span class="sourceLineNo">1970</span> }<a name="line.1970"></a> -<span class="sourceLineNo">1971</span> } catch (Throwable t) {<a name="line.1971"></a> -<span class="sourceLineNo">1972</span> LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);<a name="line.1972"></a> -<span class="sourceLineNo">1973</span> } finally {<a name="line.1973"></a> -<span class="sourceLineNo">1974</span> LOG.trace("Worker terminated.");<a name="line.1974"></a> -<span class="sourceLineNo">1975</span> }<a name="line.1975"></a> -<span class="sourceLineNo">1976</span> workerThreads.remove(this);<a name="line.1976"></a> -<span class="sourceLineNo">1977</span> }<a name="line.1977"></a> -<span class="sourceLineNo">1978</span><a name="line.1978"></a> -<span class="sourceLineNo">1979</span> @Override<a name="line.1979"></a> -<span class="sourceLineNo">1980</span> public String toString() {<a name="line.1980"></a> -<span class="sourceLineNo">1981</span> Procedure<?> p = this.activeProcedure;<a name="line.1981"></a> -<span class="sourceLineNo">1982</span> return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");<a name="line.1982"></a> -<span class="sourceLineNo">1983</span> }<a name="line.1983"></a> -<span class="sourceLineNo">1984</span><a name="line.1984"></a> -<span class="sourceLineNo">1985</span> /**<a name="line.1985"></a> -<span class="sourceLineNo">1986</span> * @return the time since the current procedure is running<a name="line.1986"></a> -<span class="sourceLineNo">1987</span> */<a name="line.1987"></a> -<span class="sourceLineNo">1988</span> public long getCurrentRunTime() {<a name="line.1988"></a> -<span class="sourceLineNo">1989</span> return EnvironmentEdgeManager.currentTime() - executionStartTime.get();<a name="line.1989"></a> -<span class="sourceLineNo">1990</span> }<a name="line.1990"></a> -<span class="sourceLineNo">1991</span><a name="line.1991"></a> -<span class="sourceLineNo">1992</span> // core worker never timeout<a name="line.1992"></a> -<span class="sourceLineNo">1993</span> protected boolean keepAlive(long lastUpdate) {<a name="line.1993"></a> -<span class="sourceLineNo">1994</span> return true;<a name="line.1994"></a> +<span class="sourceLineNo">1938</span><a name="line.1938"></a> +<span class="sourceLineNo">1939</span> @Override<a name="line.1939"></a> +<span class="sourceLineNo">1940</span> public void sendStopSignal() {<a name="line.1940"></a> +<span class="sourceLineNo">1941</span> scheduler.signalAll();<a name="line.1941"></a> +<span class="sourceLineNo">1942</span> }<a name="line.1942"></a> +<span class="sourceLineNo">1943</span> @Override<a name="line.1943"></a> +<span class="sourceLineNo">1944</span> public void run() {<a name="line.1944"></a> +<span class="sourceLineNo">1945</span> long lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1945"></a> +<span class="sourceLineNo">1946</span> try {<a name="line.1946"></a> +<span class="sourceLineNo">1947</span> while (isRunning() && keepAlive(lastUpdate)) {<a name="line.1947"></a> +<span class="sourceLineNo">1948</span> @SuppressWarnings("unchecked")<a name="line.1948"></a> +<span class="sourceLineNo">1949</span> Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);<a name="line.1949"></a> +<span class="sourceLineNo">1950</span> if (proc == null) {<a name="line.1950"></a> +<span class="sourceLineNo">1951</span> continue;<a name="line.1951"></a> +<span class="sourceLineNo">1952</span> }<a name="line.1952"></a> +<span class="sourceLineNo">1953</span> this.activeProcedure = proc;<a name="line.1953"></a> +<span class="sourceLineNo">1954</span> int activeCount = activeExecutorCount.incrementAndGet();<a name="line.1954"></a> +<span class="sourceLineNo">1955</span> int runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1955"></a> +<span class="sourceLineNo">1956</span> LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1956"></a> +<span class="sourceLineNo">1957</span> runningCount, activeCount);<a name="line.1957"></a> +<span class="sourceLineNo">1958</span> executionStartTime.set(EnvironmentEdgeManager.currentTime());<a name="line.1958"></a> +<span class="sourceLineNo">1959</span> IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());<a name="line.1959"></a> +<span class="sourceLineNo">1960</span> try {<a name="line.1960"></a> +<span class="sourceLineNo">1961</span> executeProcedure(proc);<a name="line.1961"></a> +<span class="sourceLineNo">1962</span> } catch (AssertionError e) {<a name="line.1962"></a> +<span class="sourceLineNo">1963</span> LOG.info("ASSERT pid=" + proc.getProcId(), e);<a name="line.1963"></a> +<span class="sourceLineNo">1964</span> throw e;<a name="line.1964"></a> +<span class="sourceLineNo">1965</span> } finally {<a name="line.1965"></a> +<span class="sourceLineNo">1966</span> procExecutionLock.releaseLockEntry(lockEntry);<a name="line.1966"></a> +<span class="sourceLineNo">1967</span> activeCount = activeExecutorCount.decrementAndGet();<a name="line.1967"></a> +<span class="sourceLineNo">1968</span> runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1968"></a> +<span class="sourceLineNo">1969</span> LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1969"></a> +<span class="sourceLineNo">1970</span> runningCount, activeCount);<a name="line.1970"></a> +<span class="sourceLineNo">1971</span> this.activeProcedure = null;<a name="line.1971"></a> +<span class="sourceLineNo">1972</span> lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1972"></a> +<span class="sourceLineNo">1973</span> executionStartTime.set(Long.MAX_VALUE);<a name="line.1973"></a> +<span class="sourceLineNo">1974</span> }<a name="line.1974"></a> +<span class="sourceLineNo">1975</span> }<a name="line.1975"></a> +<span class="sourceLineNo">1976</span> } catch (Throwable t) {<a name="line.1976"></a> +<span class="sourceLineNo">1977</span> LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);<a name="line.1977"></a> +<span class="sourceLineNo">1978</span> } finally {<a name="line.1978"></a> +<span class="sourceLineNo">1979</span> LOG.trace("Worker terminated.");<a name="line.1979"></a> +<span class="sourceLineNo">1980</span> }<a name="line.1980"></a> +<span class="sourceLineNo">1981</span> workerThreads.remove(this);<a name="line.1981"></a> +<span class="sourceLineNo">1982</span> }<a name="line.1982"></a> +<span class="sourceLineNo">1983</span><a name="line.1983"></a> +<span class="sourceLineNo">1984</span> @Override<a name="line.1984"></a> +<span class="sourceLineNo">1985</span> public String toString() {<a name="line.1985"></a> +<span class="sourceLineNo">1986</span> Procedure<?> p = this.activeProcedure;<a name="line.1986"></a> +<span class="sourceLineNo">1987</span> return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");<a name="line.1987"></a> +<span class="sourceLineNo">1988</span> }<a name="line.1988"></a> +<span class="sourceLineNo">1989</span><a name="line.1989"></a> +<span class="sourceLineNo">1990</span> /**<a name="line.1990"></a> +<span class="sourceLineNo">1991</span> * @return the time since the current procedure is running<a name="line.1991"></a> +<span class="sourceLineNo">1992</span> */<a name="line.1992"></a> +<span class="sourceLineNo">1993</span> public long getCurrentRunTime() {<a name="line.1993"></a> +<span class="sourceLineNo">1994</span> return EnvironmentEdgeManager.currentTime() - executionStartTime.get();<a name="line.1994"></a> <span class="sourceLineNo">1995</span> }<a name="line.1995"></a> -<span class="sourceLineNo">1996</span> }<a name="line.1996"></a> -<span class="sourceLineNo">1997</span><a name="line.1997"></a> -<span class="sourceLineNo">1998</span> // A worker thread which can be added when core workers are stuck. Will timeout after<a name="line.1998"></a> -<span class="sourceLineNo">1999</span> // keepAliveTime if there is no procedure to run.<a name="line.1999"></a> -<span class="sourceLineNo">2000</span> private final class KeepAliveWorkerThread extends WorkerThread {<a name="line.2000"></a> -<span class="sourceLineNo">2001</span><a name="line.2001"></a> -<span class="sourceLineNo">2002</span> public KeepAliveWorkerThread(ThreadGroup group) {<a name="line.2002"></a> -<span class="sourceLineNo">2003</span> super(group, "KeepAlivePEWorker-");<a name="line.2003"></a> -<span class="sourceLineNo">2004</span> }<a name="line.2004"></a> -<span class="sourceLineNo">2005</span><a name="line.2005"></a> -<span class="sourceLineNo">2006</span> @Override<a name="line.2006"></a> -<span class="sourceLineNo">2007</span> protected boolean keepAlive(long lastUpdate) {<a name="line.2007"></a> -<span class="sourceLineNo">2008</span> return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;<a name="line.2008"></a> +<span class="sourceLineNo">1996</span><a name="line.1996"></a> +<span class="sourceLineNo">1997</span> // core worker never timeout<a name="line.1997"></a> +<span class="sourceLineNo">1998</span> protected boolean keepAlive(long lastUpdate) {<a name="line.1998"></a> +<span class="sourceLineNo">1999</span> return true;<a name="line.1999"></a> +<span class="sourceLineNo">2000</span> }<a name="line.2000"></a> +<span class="sourceLineNo">2001</span> }<a name="line.2001"></a> +<span class="sourceLineNo">2002</span><a name="line.2002"></a> +<span class="sourceLineNo">2003</span> // A worker thread which can be added when core workers are stuck. Will timeout after<a name="line.2003"></a> +<span class="sourceLineNo">2004</span> // keepAliveTime if there is no procedure to run.<a name="line.2004"></a> +<span class="sourceLineNo">2005</span> private final class KeepAliveWorkerThread extends WorkerThread {<a name="line.2005"></a> +<span class="sourceLineNo">2006</span><a name="line.2006"></a> +<span class="sourceLineNo">2007</span> public KeepAliveWorkerThread(ThreadGroup group) {<a name="line.2007"></a> +<span class="sourceLineNo">2008</span> super(group, "KeepAlivePEWorker-");<a name="line.2008"></a> <span class="sourceLineNo">2009</span> }<a name="line.2009"></a> -<span class="sourceLineNo">2010</span> }<a name="line.2010"></a> -<span class="sourceLineNo">2011</span><a name="line.2011"></a> -<span class="sourceLineNo">2012</span> // ----------------------------------------------------------------------------<a name="line.2012"></a> -<span class="sourceLineNo">2013</span> // TODO-MAYBE: Should we provide a InlineChore to notify the store with the<a name="line.2013"></a> -<span class="sourceLineNo">2014</span> // full set of procedures pending and completed to write a compacted<a name="line.2014"></a> -<span class="sourceLineNo">2015</span> // version of the log (in case is a log)?<a name="line.2015"></a> -<span class="sourceLineNo">2016</span> // In theory no, procedures are have a short life, so at some point the store<a name="line.2016"></a> -<span class="sourceLineNo">2017</span> // will have the tracker saying everything is in the last log.<a name="line.2017"></a> -<span class="sourceLineNo">2018</span> // ----------------------------------------------------------------------------<a name="line.2018"></a> -<span class="sourceLineNo">2019</span><a name="line.2019"></a> -<span class="sourceLineNo">2020</span> private final class WorkerMonitor extends InlineChore {<a name="line.2020"></a> -<span class="sourceLineNo">2021</span> public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =<a name="line.2021"></a> -<span class="sourceLineNo">2022</span> "hbase.procedure.worker.monitor.interval.msec";<a name="line.2022"></a> -<span class="sourceLineNo">2023</span> private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec<a name="line.2023"></a> +<span class="sourceLineNo">2010</span><a name="line.2010"></a> +<span class="sourceLineNo">2011</span> @Override<a name="line.2011"></a> +<span class="sourceLineNo">2012</span> protected boolean keepAlive(long lastUpdate) {<a name="line.2012"></a> +<span class="sourceLineNo">2013</span> return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;<a name="line.2013"></a> +<span class="sourceLineNo">2014</span> }<a name="line.2014"></a> +<span class="sourceLineNo">2015</span> }<a name="line.2015"></a> +<span class="sourceLineNo">2016</span><a name="line.2016"></a> +<span class="sourceLineNo">2017</span> // ----------------------------------------------------------------------------<a name="line.2017"></a> +<span class="sourceLineNo">2018</span> // TODO-MAYBE: Should we provide a InlineChore to notify the store with the<a name="line.2018"></a> +<span class="sourceLineNo">2019</span> // full set of procedures pending and completed to write a compacted<a name="line.2019"></a> +<span class="sourceLineNo">2020</span> // version of the log (in case is a log)?<a name="line.2020"></a> +<span class="sourceLineNo">2021</span> // In theory no, procedures are have a short life, so at some point the store<a name="line.2021"></a> +<span class="sourceLineNo">2022</span> // will have the tracker saying everything is in the last log.<a name="line.2022"></a> +<span class="sourceLineNo">2023</span> // ----------------------------------------------------------------------------<a name="line.2023"></a> <span class="sourceLineNo">2024</span><a name="line.2024"></a> -<span class="sourceLineNo">2025</span> public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =<a name="line.2025"></a> -<span class="sourceLineNo">2026</span> "hbase.procedure.worker.stuck.threshold.msec";<a name="line.2026"></a> -<span class="sourceLineNo">2027</span> private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec<a name="line.2027"></a> -<span class="sourceLineNo">2028</span><a name="line.2028"></a> -<span class="sourceLineNo">2029</span> public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =<a name="line.2029"></a> -<span class="sourceLineNo">2030</span> "hbase.procedure.worker.add.stuck.percentage";<a name="line.2030"></a> -<span class="sourceLineNo">2031</span> private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck<a name="line.2031"></a> -<span class="sourceLineNo">2032</span><a name="line.2032"></a> -<span class="sourceLineNo">2033</span> private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;<a name="line.2033"></a> -<span class="sourceLineNo">2034</span> private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;<a name="line.2034"></a> -<span class="sourceLineNo">2035</span> private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;<a name="line.2035"></a> -<span class="sourceLineNo">2036</span><a name="line.2036"></a> -<span class="sourceLineNo">2037</span> public WorkerMonitor() {<a name="line.2037"></a> -<span class="sourceLineNo">2038</span> refreshConfig();<a name="line.2038"></a> -<span class="sourceLineNo">2039</span> }<a name="line.2039"></a> -<span class="sourceLineNo">2040</span><a name="line.2040"></a> -<span class="sourceLineNo">2041</span> @Override<a name="line.2041"></a> -<span class="sourceLineNo">2042</span> public void run() {<a name="line.2042"></a> -<span class="sourceLineNo">2043</span> final int stuckCount = checkForStuckWorkers();<a name="line.2043"></a> -<span class="sourceLineNo">2044</span> checkThreadCount(stuckCount);<a name="line.2044"></a> +<span class="sourceLineNo">2025</span> private final class WorkerMonitor extends InlineChore {<a name="line.2025"></a> +<span class="sourceLineNo">2026</span> public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =<a name="line.2026"></a> +<span class="sourceLineNo">2027</span> "hbase.procedure.worker.monitor.interval.msec";<a name="line.2027"></a> +<span class="sourceLineNo">2028</span> private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec<a name="line.2028"></a> +<span class="sourceLineNo">2029</span><a name="line.2029"></a> +<span class="sourceLineNo">2030</span> public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =<a name="line.2030"></a> +<span class="sourceLineNo">2031</span> "hbase.procedure.worker.stuck.threshold.msec";<a name="line.2031"></a> +<span class="sourceLineNo">2032</span> private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec<a name="line.2032"></a> +<span class="sourceLineNo">2033</span><a name="line.2033"></a> +<span class="sourceLineNo">2034</span> public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =<a name="line.2034"></a> +<span class="sourceLineNo">2035</span> "hbase.procedure.worker.add.stuck.percentage";<a name="line.2035"></a> +<span class="sourceLineNo">2036</span> private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck<a name="line.2036"></a> +<span class="sourceLineNo">2037</span><a name="line.2037"></a> +<span class="sourceLineNo">2038</span> private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;<a name="line.2038"></a> +<span class="sourceLineNo">2039</span> private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;<a name="line.2039"></a> +<span class="sourceLineNo">2040</span> private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;<a name="line.2040"></a> +<span class="sourceLineNo">2041</span><a name="line.2041"></a> +<span class="sourceLineNo">2042</span> public WorkerMonitor() {<a name="line.2042"></a> +<span class="sourceLineNo">2043</span> refreshConfig();<a name="line.2043"></a> +<span class="sourceLineNo">2044</span> }<a name="line.2044"></a> <span class="sourceLineNo">2045</span><a name="line.2045"></a> -<span class="sourceLineNo">2046</span> // refresh interval (poor man dynamic conf update)<a name="line.2046"></a> -<span class="sourceLineNo">2047</span> refreshConfig();<a name="line.2047"></a> -<span class="sourceLineNo">2048</span> }<a name="line.2048"></a> -<span class="sourceLineNo">2049</span><a name="line.2049"></a> -<span class="sourceLineNo">2050</span> private int checkForStuckWorkers() {<a name="line.2050"></a> -<span class="sourceLineNo">2051</span> // check if any of the worker is stuck<a name="line.2051"></a> -<span class="sourceLineNo">2052</span> int stuckCount = 0;<a name="line.2052"></a> -<span class="sourceLineNo">2053</span> for (WorkerThread worker : workerThreads) {<a name="line.2053"></a> -<span class="sourceLineNo">2054</span> if (worker.getCurrentRunTime() < stuckThreshold) {<a name="line.2054"></a> -<span class="sourceLineNo">2055</span> continue;<a name="line.2055"></a> -<span class="sourceLineNo">2056</span> }<a name="line.2056"></a> -<span class="sourceLineNo">2057</span><a name="line.2057"></a> -<span class="sourceLineNo">2058</span> // WARN the worker is stuck<a name="line.2058"></a> -<span class="sourceLineNo">2059</span> stuckCount++;<a name="line.2059"></a> -<span class="sourceLineNo">2060</span> LOG.warn("Worker stuck {}, run time {}", worker,<a name="line.2060"></a> -<span class="sourceLineNo">2061</span> StringUtils.humanTimeDiff(worker.getCurrentRunTime()));<a name="line.2061"></a> -<span class="sourceLineNo">2062</span> }<a name="line.2062"></a> -<span class="sourceLineNo">2063</span> return stuckCount;<a name="line.2063"></a> -<span class="sourceLineNo">2064</span> }<a name="line.2064"></a> -<span class="sourceLineNo">2065</span><a name="line.2065"></a> -<span class="sourceLineNo">2066</span> private void checkThreadCount(final int stuckCount) {<a name="line.2066"></a> -<span class="sourceLineNo">2067</span> // nothing to do if there are no runnable tasks<a name="line.2067"></a> -<span class="sourceLineNo">2068</span> if (stuckCount < 1 || !scheduler.hasRunnables()) {<a name="line.2068"></a> -<span class="sourceLineNo">2069</span> return;<a name="line.2069"></a> -<span class="sourceLineNo">2070</span> }<a name="line.2070"></a> -<span class="sourceLineNo">2071</span><a name="line.2071"></a> -<span class="sourceLineNo">2072</span> // add a new thread if the worker stuck percentage exceed the threshold limit<a name="line.2072"></a> -<span class="sourceLineNo">2073</span> // and every handler is active.<a name="line.2073"></a> -<span class="sourceLineNo">2074</span> final float stuckPerc = ((float) stuckCount) / workerThreads.size();<a name="line.2074"></a> -<span class="sourceLineNo">2075</span> // let's add new worker thread more aggressively, as they will timeout finally if there is no<a name="line.2075"></a> -<span class="sourceLineNo">2076</span> // work to do.<a name="line.2076"></a> -<span class="sourceLineNo">2077</span> if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {<a name="line.2077"></a> -<span class="sourceLineNo">2078</span> final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);<a name="line.2078"></a> -<span class="sourceLineNo">2079</span> workerThreads.add(worker);<a name="line.2079"></a> -<span class="sourceLineNo">2080</span> worker.start();<a name="line.2080"></a> -<span class="sourceLineNo">2081</span> LOG.debug("Added new worker thread {}", worker);<a name="line.2081"></a> -<span class="sourceLineNo">2082</span> }<a name="line.2082"></a> -<span class="sourceLineNo">2083</span> }<a name="line.2083"></a> -<span class="sourceLineNo">2084</span><a name="line.2084"></a> -<span class="sourceLineNo">2085</span> private void refreshConfig() {<a name="line.2085"></a> -<span class="sourceLineNo">2086</span> addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,<a name="line.2086"></a> -<span class="sourceLineNo">2087</span> DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);<a name="line.2087"></a> -<span class="sourceLineNo">2088</span> timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,<a name="line.2088"></a> -<span class="sourceLineNo">2089</span> DEFAULT_WORKER_MONITOR_INTERVAL);<a name="line.2089"></a> -<span class="sourceLineNo">2090</span> stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,<a name="line.2090"></a> -<span class="sourceLineNo">2091</span> DEFAULT_WORKER_STUCK_THRESHOLD);<a name="line.2091"></a> -<span class="sourceLineNo">2092</span> }<a name="line.2092"></a> -<span class="sourceLineNo">2093</span><a name="line.2093"></a> -<span class="sourceLineNo">2094</span> @Override<a name="line.2094"></a> -<span class="sourceLineNo">2095</span> public int getTimeoutInterval() {<a name="line.2095"></a> -<span class="sourceLineNo">2096</span> return timeoutInterval;<a name="line.2096"></a> +<span class="sourceLineNo">2046</span> @Override<a name="line.2046"></a> +<span class="sourceLineNo">2047</span> public void run() {<a name="line.2047"></a> +<span class="sourceLineNo">2048</span> final int stuckCount = checkForStuckWorkers();<a name="line.2048"></a> +<span class="sourceLineNo">2049</span> checkThreadCount(stuckCount);<a name="line.2049"></a> +<span class="sourceLineNo">2050</span><a name="line.2050"></a> +<span class="sourceLineNo">2051</span> // refresh interval (poor man dynamic conf update)<a name="line.2051"></a> +<span class="sourceLineNo">2052</span> refreshConfig();<a name="line.2052"></a> +<span class="sourceLineNo">2053</span> }<a name="line.2053"></a> +<span class="sourceLineNo">2054</span><a name="line.2054"></a> +<span class="sourceLineNo">2055</span> private int checkForStuckWorkers() {<a name="line.2055"></a> +<span class="sourceLineNo">2056</span> // check if any of the worker is stuck<a name="line.2056"></a> +<span class="sourceLineNo">2057</span> int stuckCount = 0;<a name="line.2057"></a> +<span class="sourceLineNo">2058</span> for (WorkerThread worker : workerThreads) {<a name="line.2058"></a> +<span class="sourceLineNo">2059</span> if (worker.getCurrentRunTime() < stuckThreshold) {<a name="line.2059"></a> +<span class="sourceLineNo">2060</span> continue;<a name="line.2060"></a> +<span class="sourceLineNo">2061</span> }<a name="line.2061"></a> +<span class="sourceLineNo">2062</span><a name="line.2062"></a> +<span class="sourceLineNo">2063</span> // WARN the worker is stuck<a name="line.2063"></a> +<span class="sourceLineNo">2064</span> stuckCount++;<a name="line.2064"></a> +<span class="sourceLineNo">2065</span> LOG.warn("Worker stuck {}, run time {}", worker,<a name="line.2065"></a> +<span class="sourceLineNo">2066</span> StringUtils.humanTimeDiff(worker.getCurrentRunTime()));<a name="line.2066"></a> +<span class="sourceLineNo">2067</span> }<a name="line.2067"></a> +<span class="sourceLineNo">2068</span> return stuckCount;<a name="line.2068"></a> +<span class="sourceLineNo">2069</span> }<a name="line.2069"></a> +<span class="sourceLineNo">2070</span><a name="line.2070"></a> +<span class="sourceLineNo">2071</span> private void checkThreadCount(final int stuckCount) {<a name="line.2071"></a> +<span class="sourceLineNo">2072</span> // nothing to do if there are no runnable tasks<a name="line.2072"></a> +<span class="sourceLineNo">2073</span> if (stuckCount < 1 || !scheduler.hasRunnables()) {<a name="line.2073"></a> +<span class="sourceLineNo">2074</span> return;<a name="line.2074"></a> +<span class="sourceLineNo">2075</span> }<a name="line.2075"></a> +<span class="sourceLineNo">2076</span><a name="line.2076"></a> +<span class="sourceLineNo">2077</span> // add a new thread if the worker stuck percentage exceed the threshold limit<a name="line.2077"></a> +<span class="sourceLineNo">2078</span> // and every handler is active.<a name="line.2078"></a> +<span class="sourceLineNo">2079</span> final float stuckPerc = ((float) stuckCount) / workerThreads.size();<a name="line.2079"></a> +<span class="sourceLineNo">2080</span> // let's add new worker thread more aggressively, as they will timeout finally if there is no<a name="line.2080"></a> +<span class="sourceLineNo">2081</span> // work to do.<a name="line.2081"></a> +<span class="sourceLineNo">2082</span> if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {<a name="line.2082"></a> +<span class="sourceLineNo">2083</span> final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);<a name="line.2083"></a> +<span class="sourceLineNo">2084</span> workerThreads.add(worker);<a name="line.2084"></a> +<span class="sourceLineNo">2085</span> worker.start();<a name="line.2085"></a> +<span class="sourceLineNo">2086</span> LOG.debug("Added new worker thread {}", worker);<a name="line.2086"></a> +<span class="sourceLineNo">2087</span> }<a name="line.2087"></a> +<span class="sourceLineNo">2088</span> }<a name="line.2088"></a> +<span class="sourceLineNo">2089</span><a name="line.2089"></a> +<span class="sourceLineNo">2090</span> private void refreshConfig() {<a name="line.2090"></a> +<span class="sourceLineNo">2091</span> addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,<a name="line.2091"></a> +<span class="sourceLineNo">2092</span> DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);<a name="line.2092"></a> +<span class="sourceLineNo">2093</span> timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,<a name="line.2093"></a> +<span class="sourceLineNo">2094</span> DEFAULT_WORKER_MONITOR_INTERVAL);<a name="line.2094"></a> +<span class="sourceLineNo">2095</span> stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,<a name="line.2095"></a> +<span class="sourceLineNo">2096</span> DEFAULT_WORKER_STUCK_THRESHOLD);<a name="line.2096"></a> <span class="sourceLineNo">2097</span> }<a name="line.2097"></a> -<span class="sourceLineNo">2098</span> }<a name="line.2098"></a> -<span class="sourceLineNo">2099</span>}<a name="line.2099"></a> +<span class="sourceLineNo">2098</span><a name="line.2098"></a> +<span class="sourceLineNo">2099</span> @Override<a name="line.2099"></a> +<span class="sourceLineNo">2100</span> public int getTimeoutInterval() {<a name="line.2100"></a> +<span class="sourceLineNo">2101</span> return timeoutInterval;<a name="line.2101"></a> +<span class="sourceLineNo">2102</span> }<a name="line.2102"></a> +<span class="sourceLineNo">2103</span> }<a name="line.2103"></a> +<span class="sourceLineNo">2104</span>}<a name="line.2104"></a>