http://git-wip-us.apache.org/repos/asf/hbase-site/blob/14db89d7/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.html
----------------------------------------------------------------------
diff --git
a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.html
b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.html
index b50a65f..7271567 100644
---
a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.html
+++
b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.html
@@ -1718,312 +1718,314 @@
<span class="sourceLineNo">1710</span><a name="line.1710"></a>
<span class="sourceLineNo">1711</span> public WorkerThread(final
ThreadGroup group) {<a name="line.1711"></a>
<span class="sourceLineNo">1712</span> super(group, "ProcExecWrkr-" +
workerId.incrementAndGet());<a name="line.1712"></a>
-<span class="sourceLineNo">1713</span> }<a name="line.1713"></a>
-<span class="sourceLineNo">1714</span><a name="line.1714"></a>
-<span class="sourceLineNo">1715</span> @Override<a name="line.1715"></a>
-<span class="sourceLineNo">1716</span> public void sendStopSignal() {<a
name="line.1716"></a>
-<span class="sourceLineNo">1717</span> scheduler.signalAll();<a
name="line.1717"></a>
-<span class="sourceLineNo">1718</span> }<a name="line.1718"></a>
-<span class="sourceLineNo">1719</span><a name="line.1719"></a>
-<span class="sourceLineNo">1720</span> @Override<a name="line.1720"></a>
-<span class="sourceLineNo">1721</span> public void run() {<a
name="line.1721"></a>
-<span class="sourceLineNo">1722</span> long lastUpdate =
EnvironmentEdgeManager.currentTime();<a name="line.1722"></a>
-<span class="sourceLineNo">1723</span> try {<a name="line.1723"></a>
-<span class="sourceLineNo">1724</span> while (isRunning() &&
keepAlive(lastUpdate)) {<a name="line.1724"></a>
-<span class="sourceLineNo">1725</span> this.activeProcedure =
scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);<a name="line.1725"></a>
-<span class="sourceLineNo">1726</span> if (this.activeProcedure ==
null) continue;<a name="line.1726"></a>
-<span class="sourceLineNo">1727</span> int activeCount =
activeExecutorCount.incrementAndGet();<a name="line.1727"></a>
-<span class="sourceLineNo">1728</span> int runningCount =
store.setRunningProcedureCount(activeCount);<a name="line.1728"></a>
-<span class="sourceLineNo">1729</span> if (LOG.isTraceEnabled()) {<a
name="line.1729"></a>
-<span class="sourceLineNo">1730</span> LOG.trace("Execute pid=" +
this.activeProcedure.getProcId() +<a name="line.1730"></a>
-<span class="sourceLineNo">1731</span> " runningCount=" +
runningCount + ", activeCount=" + activeCount);<a name="line.1731"></a>
-<span class="sourceLineNo">1732</span> }<a name="line.1732"></a>
-<span class="sourceLineNo">1733</span>
executionStartTime.set(EnvironmentEdgeManager.currentTime());<a
name="line.1733"></a>
-<span class="sourceLineNo">1734</span> try {<a name="line.1734"></a>
-<span class="sourceLineNo">1735</span>
executeProcedure(this.activeProcedure);<a name="line.1735"></a>
-<span class="sourceLineNo">1736</span> } catch (AssertionError e) {<a
name="line.1736"></a>
-<span class="sourceLineNo">1737</span> LOG.info("ASSERT pid=" +
this.activeProcedure.getProcId(), e);<a name="line.1737"></a>
-<span class="sourceLineNo">1738</span> throw e;<a
name="line.1738"></a>
-<span class="sourceLineNo">1739</span> } finally {<a
name="line.1739"></a>
-<span class="sourceLineNo">1740</span> activeCount =
activeExecutorCount.decrementAndGet();<a name="line.1740"></a>
-<span class="sourceLineNo">1741</span> runningCount =
store.setRunningProcedureCount(activeCount);<a name="line.1741"></a>
-<span class="sourceLineNo">1742</span> if (LOG.isTraceEnabled())
{<a name="line.1742"></a>
-<span class="sourceLineNo">1743</span> LOG.trace("Halt pid=" +
this.activeProcedure.getProcId() +<a name="line.1743"></a>
-<span class="sourceLineNo">1744</span> " runningCount=" +
runningCount + ", activeCount=" + activeCount);<a name="line.1744"></a>
-<span class="sourceLineNo">1745</span> }<a name="line.1745"></a>
-<span class="sourceLineNo">1746</span> this.activeProcedure =
null;<a name="line.1746"></a>
-<span class="sourceLineNo">1747</span> lastUpdate =
EnvironmentEdgeManager.currentTime();<a name="line.1747"></a>
-<span class="sourceLineNo">1748</span>
executionStartTime.set(Long.MAX_VALUE);<a name="line.1748"></a>
-<span class="sourceLineNo">1749</span> }<a name="line.1749"></a>
-<span class="sourceLineNo">1750</span> }<a name="line.1750"></a>
-<span class="sourceLineNo">1751</span> } catch (Throwable t) {<a
name="line.1751"></a>
-<span class="sourceLineNo">1752</span> LOG.warn("Worker terminating
UNNATURALLY " + this.activeProcedure, t);<a name="line.1752"></a>
-<span class="sourceLineNo">1753</span> } finally {<a name="line.1753"></a>
-<span class="sourceLineNo">1754</span> LOG.debug("Worker
terminated.");<a name="line.1754"></a>
-<span class="sourceLineNo">1755</span> }<a name="line.1755"></a>
-<span class="sourceLineNo">1756</span> workerThreads.remove(this);<a
name="line.1756"></a>
-<span class="sourceLineNo">1757</span> }<a name="line.1757"></a>
-<span class="sourceLineNo">1758</span><a name="line.1758"></a>
-<span class="sourceLineNo">1759</span> @Override<a name="line.1759"></a>
-<span class="sourceLineNo">1760</span> public String toString() {<a
name="line.1760"></a>
-<span class="sourceLineNo">1761</span> Procedure<?> p =
this.activeProcedure;<a name="line.1761"></a>
-<span class="sourceLineNo">1762</span> return getName() + "(pid=" + (p ==
null? Procedure.NO_PROC_ID: p.getProcId() + ")");<a name="line.1762"></a>
-<span class="sourceLineNo">1763</span> }<a name="line.1763"></a>
-<span class="sourceLineNo">1764</span><a name="line.1764"></a>
-<span class="sourceLineNo">1765</span> /**<a name="line.1765"></a>
-<span class="sourceLineNo">1766</span> * @return the time since the
current procedure is running<a name="line.1766"></a>
-<span class="sourceLineNo">1767</span> */<a name="line.1767"></a>
-<span class="sourceLineNo">1768</span> public long getCurrentRunTime() {<a
name="line.1768"></a>
-<span class="sourceLineNo">1769</span> return
EnvironmentEdgeManager.currentTime() - executionStartTime.get();<a
name="line.1769"></a>
-<span class="sourceLineNo">1770</span> }<a name="line.1770"></a>
-<span class="sourceLineNo">1771</span><a name="line.1771"></a>
-<span class="sourceLineNo">1772</span> private boolean keepAlive(final long
lastUpdate) {<a name="line.1772"></a>
-<span class="sourceLineNo">1773</span> if (workerThreads.size() <=
corePoolSize) return true;<a name="line.1773"></a>
-<span class="sourceLineNo">1774</span> return
(EnvironmentEdgeManager.currentTime() - lastUpdate) < keepAliveTime;<a
name="line.1774"></a>
-<span class="sourceLineNo">1775</span> }<a name="line.1775"></a>
-<span class="sourceLineNo">1776</span> }<a name="line.1776"></a>
-<span class="sourceLineNo">1777</span><a name="line.1777"></a>
-<span class="sourceLineNo">1778</span> /**<a name="line.1778"></a>
-<span class="sourceLineNo">1779</span> * Runs task on a period such as check
for stuck workers.<a name="line.1779"></a>
-<span class="sourceLineNo">1780</span> * @see InlineChore<a
name="line.1780"></a>
-<span class="sourceLineNo">1781</span> */<a name="line.1781"></a>
-<span class="sourceLineNo">1782</span> private final class
TimeoutExecutorThread extends StoppableThread {<a name="line.1782"></a>
-<span class="sourceLineNo">1783</span> private final
DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();<a
name="line.1783"></a>
-<span class="sourceLineNo">1784</span><a name="line.1784"></a>
-<span class="sourceLineNo">1785</span> public TimeoutExecutorThread(final
ThreadGroup group) {<a name="line.1785"></a>
-<span class="sourceLineNo">1786</span> super(group, "ProcExecTimeout");<a
name="line.1786"></a>
-<span class="sourceLineNo">1787</span> }<a name="line.1787"></a>
-<span class="sourceLineNo">1788</span><a name="line.1788"></a>
-<span class="sourceLineNo">1789</span> @Override<a name="line.1789"></a>
-<span class="sourceLineNo">1790</span> public void sendStopSignal() {<a
name="line.1790"></a>
-<span class="sourceLineNo">1791</span>
queue.add(DelayedUtil.DELAYED_POISON);<a name="line.1791"></a>
-<span class="sourceLineNo">1792</span> }<a name="line.1792"></a>
-<span class="sourceLineNo">1793</span><a name="line.1793"></a>
-<span class="sourceLineNo">1794</span> @Override<a name="line.1794"></a>
-<span class="sourceLineNo">1795</span> public void run() {<a
name="line.1795"></a>
-<span class="sourceLineNo">1796</span> final boolean traceEnabled =
LOG.isTraceEnabled();<a name="line.1796"></a>
-<span class="sourceLineNo">1797</span> while (isRunning()) {<a
name="line.1797"></a>
-<span class="sourceLineNo">1798</span> final DelayedWithTimeout task =
DelayedUtil.takeWithoutInterrupt(queue);<a name="line.1798"></a>
-<span class="sourceLineNo">1799</span> if (task == null || task ==
DelayedUtil.DELAYED_POISON) {<a name="line.1799"></a>
-<span class="sourceLineNo">1800</span> // the executor may be
shutting down,<a name="line.1800"></a>
-<span class="sourceLineNo">1801</span> // and the task is just the
shutdown request<a name="line.1801"></a>
-<span class="sourceLineNo">1802</span> continue;<a
name="line.1802"></a>
-<span class="sourceLineNo">1803</span> }<a name="line.1803"></a>
-<span class="sourceLineNo">1804</span><a name="line.1804"></a>
-<span class="sourceLineNo">1805</span> if (traceEnabled) {<a
name="line.1805"></a>
-<span class="sourceLineNo">1806</span> LOG.trace("Executing " +
task);<a name="line.1806"></a>
-<span class="sourceLineNo">1807</span> }<a name="line.1807"></a>
-<span class="sourceLineNo">1808</span><a name="line.1808"></a>
-<span class="sourceLineNo">1809</span> // execute the task<a
name="line.1809"></a>
-<span class="sourceLineNo">1810</span> if (task instanceof InlineChore)
{<a name="line.1810"></a>
-<span class="sourceLineNo">1811</span>
execInlineChore((InlineChore)task);<a name="line.1811"></a>
-<span class="sourceLineNo">1812</span> } else if (task instanceof
DelayedProcedure) {<a name="line.1812"></a>
-<span class="sourceLineNo">1813</span>
execDelayedProcedure((DelayedProcedure)task);<a name="line.1813"></a>
-<span class="sourceLineNo">1814</span> } else {<a name="line.1814"></a>
-<span class="sourceLineNo">1815</span> LOG.error("CODE-BUG unknown
timeout task type " + task);<a name="line.1815"></a>
-<span class="sourceLineNo">1816</span> }<a name="line.1816"></a>
-<span class="sourceLineNo">1817</span> }<a name="line.1817"></a>
-<span class="sourceLineNo">1818</span> }<a name="line.1818"></a>
-<span class="sourceLineNo">1819</span><a name="line.1819"></a>
-<span class="sourceLineNo">1820</span> public void add(final InlineChore
chore) {<a name="line.1820"></a>
-<span class="sourceLineNo">1821</span> chore.refreshTimeout();<a
name="line.1821"></a>
-<span class="sourceLineNo">1822</span> queue.add(chore);<a
name="line.1822"></a>
-<span class="sourceLineNo">1823</span> }<a name="line.1823"></a>
-<span class="sourceLineNo">1824</span><a name="line.1824"></a>
-<span class="sourceLineNo">1825</span> public void add(final Procedure
procedure) {<a name="line.1825"></a>
-<span class="sourceLineNo">1826</span> assert procedure.getState() ==
ProcedureState.WAITING_TIMEOUT;<a name="line.1826"></a>
-<span class="sourceLineNo">1827</span> LOG.info("ADDED " + procedure + ";
timeout=" + procedure.getTimeout() +<a name="line.1827"></a>
-<span class="sourceLineNo">1828</span> ", timestamp=" +
procedure.getTimeoutTimestamp());<a name="line.1828"></a>
-<span class="sourceLineNo">1829</span> queue.add(new
DelayedProcedure(procedure));<a name="line.1829"></a>
-<span class="sourceLineNo">1830</span> }<a name="line.1830"></a>
-<span class="sourceLineNo">1831</span><a name="line.1831"></a>
-<span class="sourceLineNo">1832</span> public boolean remove(final
Procedure procedure) {<a name="line.1832"></a>
-<span class="sourceLineNo">1833</span> return queue.remove(new
DelayedProcedure(procedure));<a name="line.1833"></a>
-<span class="sourceLineNo">1834</span> }<a name="line.1834"></a>
-<span class="sourceLineNo">1835</span><a name="line.1835"></a>
-<span class="sourceLineNo">1836</span> private void execInlineChore(final
InlineChore chore) {<a name="line.1836"></a>
-<span class="sourceLineNo">1837</span> chore.run();<a
name="line.1837"></a>
-<span class="sourceLineNo">1838</span> add(chore);<a name="line.1838"></a>
-<span class="sourceLineNo">1839</span> }<a name="line.1839"></a>
-<span class="sourceLineNo">1840</span><a name="line.1840"></a>
-<span class="sourceLineNo">1841</span> private void
execDelayedProcedure(final DelayedProcedure delayed) {<a name="line.1841"></a>
-<span class="sourceLineNo">1842</span> // TODO: treat this as a normal
procedure, add it to the scheduler and<a name="line.1842"></a>
-<span class="sourceLineNo">1843</span> // let one of the workers handle
it.<a name="line.1843"></a>
-<span class="sourceLineNo">1844</span> // Today we consider
ProcedureInMemoryChore as InlineChores<a name="line.1844"></a>
-<span class="sourceLineNo">1845</span> final Procedure procedure =
delayed.getObject();<a name="line.1845"></a>
-<span class="sourceLineNo">1846</span> if (procedure instanceof
ProcedureInMemoryChore) {<a name="line.1846"></a>
-<span class="sourceLineNo">1847</span>
executeInMemoryChore((ProcedureInMemoryChore)procedure);<a name="line.1847"></a>
-<span class="sourceLineNo">1848</span> // if the procedure is in a
waiting state again, put it back in the queue<a name="line.1848"></a>
-<span class="sourceLineNo">1849</span> procedure.updateTimestamp();<a
name="line.1849"></a>
-<span class="sourceLineNo">1850</span> if (procedure.isWaiting()) {<a
name="line.1850"></a>
-<span class="sourceLineNo">1851</span>
delayed.setTimeout(procedure.getTimeoutTimestamp());<a name="line.1851"></a>
-<span class="sourceLineNo">1852</span> queue.add(delayed);<a
name="line.1852"></a>
-<span class="sourceLineNo">1853</span> }<a name="line.1853"></a>
-<span class="sourceLineNo">1854</span> } else {<a name="line.1854"></a>
-<span class="sourceLineNo">1855</span>
executeTimedoutProcedure(procedure);<a name="line.1855"></a>
-<span class="sourceLineNo">1856</span> }<a name="line.1856"></a>
-<span class="sourceLineNo">1857</span> }<a name="line.1857"></a>
-<span class="sourceLineNo">1858</span><a name="line.1858"></a>
-<span class="sourceLineNo">1859</span> private void
executeInMemoryChore(final ProcedureInMemoryChore chore) {<a
name="line.1859"></a>
-<span class="sourceLineNo">1860</span> if (!chore.isWaiting()) return;<a
name="line.1860"></a>
-<span class="sourceLineNo">1861</span><a name="line.1861"></a>
-<span class="sourceLineNo">1862</span> // The ProcedureInMemoryChore is a
special case, and it acts as a chore.<a name="line.1862"></a>
-<span class="sourceLineNo">1863</span> // instead of bringing the Chore
class in, we reuse this timeout thread for<a name="line.1863"></a>
-<span class="sourceLineNo">1864</span> // this special case.<a
name="line.1864"></a>
-<span class="sourceLineNo">1865</span> try {<a name="line.1865"></a>
-<span class="sourceLineNo">1866</span>
chore.periodicExecute(getEnvironment());<a name="line.1866"></a>
-<span class="sourceLineNo">1867</span> } catch (Throwable e) {<a
name="line.1867"></a>
-<span class="sourceLineNo">1868</span> LOG.error("Ignoring " + chore +
" exception: " + e.getMessage(), e);<a name="line.1868"></a>
-<span class="sourceLineNo">1869</span> }<a name="line.1869"></a>
-<span class="sourceLineNo">1870</span> }<a name="line.1870"></a>
-<span class="sourceLineNo">1871</span><a name="line.1871"></a>
-<span class="sourceLineNo">1872</span> private void
executeTimedoutProcedure(final Procedure proc) {<a name="line.1872"></a>
-<span class="sourceLineNo">1873</span> // The procedure received a
timeout. if the procedure itself does not handle it,<a name="line.1873"></a>
-<span class="sourceLineNo">1874</span> // call abort() and add the
procedure back in the queue for rollback.<a name="line.1874"></a>
-<span class="sourceLineNo">1875</span> if
(proc.setTimeoutFailure(getEnvironment())) {<a name="line.1875"></a>
-<span class="sourceLineNo">1876</span> long rootProcId =
Procedure.getRootProcedureId(procedures, proc);<a name="line.1876"></a>
-<span class="sourceLineNo">1877</span> RootProcedureState procStack =
rollbackStack.get(rootProcId);<a name="line.1877"></a>
-<span class="sourceLineNo">1878</span> procStack.abort();<a
name="line.1878"></a>
-<span class="sourceLineNo">1879</span> store.update(proc);<a
name="line.1879"></a>
-<span class="sourceLineNo">1880</span> scheduler.addFront(proc);<a
name="line.1880"></a>
-<span class="sourceLineNo">1881</span> }<a name="line.1881"></a>
-<span class="sourceLineNo">1882</span> }<a name="line.1882"></a>
-<span class="sourceLineNo">1883</span> }<a name="line.1883"></a>
-<span class="sourceLineNo">1884</span><a name="line.1884"></a>
-<span class="sourceLineNo">1885</span> private static final class
DelayedProcedure<a name="line.1885"></a>
-<span class="sourceLineNo">1886</span> extends
DelayedUtil.DelayedContainerWithTimestamp<Procedure> {<a
name="line.1886"></a>
-<span class="sourceLineNo">1887</span> public DelayedProcedure(final
Procedure procedure) {<a name="line.1887"></a>
-<span class="sourceLineNo">1888</span> super(procedure,
procedure.getTimeoutTimestamp());<a name="line.1888"></a>
-<span class="sourceLineNo">1889</span> }<a name="line.1889"></a>
-<span class="sourceLineNo">1890</span> }<a name="line.1890"></a>
-<span class="sourceLineNo">1891</span><a name="line.1891"></a>
-<span class="sourceLineNo">1892</span> private static abstract class
StoppableThread extends Thread {<a name="line.1892"></a>
-<span class="sourceLineNo">1893</span> public StoppableThread(final
ThreadGroup group, final String name) {<a name="line.1893"></a>
-<span class="sourceLineNo">1894</span> super(group, name);<a
name="line.1894"></a>
-<span class="sourceLineNo">1895</span> }<a name="line.1895"></a>
-<span class="sourceLineNo">1896</span><a name="line.1896"></a>
-<span class="sourceLineNo">1897</span> public abstract void
sendStopSignal();<a name="line.1897"></a>
+<span class="sourceLineNo">1713</span> setDaemon(true);<a
name="line.1713"></a>
+<span class="sourceLineNo">1714</span> }<a name="line.1714"></a>
+<span class="sourceLineNo">1715</span><a name="line.1715"></a>
+<span class="sourceLineNo">1716</span> @Override<a name="line.1716"></a>
+<span class="sourceLineNo">1717</span> public void sendStopSignal() {<a
name="line.1717"></a>
+<span class="sourceLineNo">1718</span> scheduler.signalAll();<a
name="line.1718"></a>
+<span class="sourceLineNo">1719</span> }<a name="line.1719"></a>
+<span class="sourceLineNo">1720</span><a name="line.1720"></a>
+<span class="sourceLineNo">1721</span> @Override<a name="line.1721"></a>
+<span class="sourceLineNo">1722</span> public void run() {<a
name="line.1722"></a>
+<span class="sourceLineNo">1723</span> long lastUpdate =
EnvironmentEdgeManager.currentTime();<a name="line.1723"></a>
+<span class="sourceLineNo">1724</span> try {<a name="line.1724"></a>
+<span class="sourceLineNo">1725</span> while (isRunning() &&
keepAlive(lastUpdate)) {<a name="line.1725"></a>
+<span class="sourceLineNo">1726</span> this.activeProcedure =
scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);<a name="line.1726"></a>
+<span class="sourceLineNo">1727</span> if (this.activeProcedure ==
null) continue;<a name="line.1727"></a>
+<span class="sourceLineNo">1728</span> int activeCount =
activeExecutorCount.incrementAndGet();<a name="line.1728"></a>
+<span class="sourceLineNo">1729</span> int runningCount =
store.setRunningProcedureCount(activeCount);<a name="line.1729"></a>
+<span class="sourceLineNo">1730</span> if (LOG.isTraceEnabled()) {<a
name="line.1730"></a>
+<span class="sourceLineNo">1731</span> LOG.trace("Execute pid=" +
this.activeProcedure.getProcId() +<a name="line.1731"></a>
+<span class="sourceLineNo">1732</span> " runningCount=" +
runningCount + ", activeCount=" + activeCount);<a name="line.1732"></a>
+<span class="sourceLineNo">1733</span> }<a name="line.1733"></a>
+<span class="sourceLineNo">1734</span>
executionStartTime.set(EnvironmentEdgeManager.currentTime());<a
name="line.1734"></a>
+<span class="sourceLineNo">1735</span> try {<a name="line.1735"></a>
+<span class="sourceLineNo">1736</span>
executeProcedure(this.activeProcedure);<a name="line.1736"></a>
+<span class="sourceLineNo">1737</span> } catch (AssertionError e) {<a
name="line.1737"></a>
+<span class="sourceLineNo">1738</span> LOG.info("ASSERT pid=" +
this.activeProcedure.getProcId(), e);<a name="line.1738"></a>
+<span class="sourceLineNo">1739</span> throw e;<a
name="line.1739"></a>
+<span class="sourceLineNo">1740</span> } finally {<a
name="line.1740"></a>
+<span class="sourceLineNo">1741</span> activeCount =
activeExecutorCount.decrementAndGet();<a name="line.1741"></a>
+<span class="sourceLineNo">1742</span> runningCount =
store.setRunningProcedureCount(activeCount);<a name="line.1742"></a>
+<span class="sourceLineNo">1743</span> if (LOG.isTraceEnabled())
{<a name="line.1743"></a>
+<span class="sourceLineNo">1744</span> LOG.trace("Halt pid=" +
this.activeProcedure.getProcId() +<a name="line.1744"></a>
+<span class="sourceLineNo">1745</span> " runningCount=" +
runningCount + ", activeCount=" + activeCount);<a name="line.1745"></a>
+<span class="sourceLineNo">1746</span> }<a name="line.1746"></a>
+<span class="sourceLineNo">1747</span> this.activeProcedure =
null;<a name="line.1747"></a>
+<span class="sourceLineNo">1748</span> lastUpdate =
EnvironmentEdgeManager.currentTime();<a name="line.1748"></a>
+<span class="sourceLineNo">1749</span>
executionStartTime.set(Long.MAX_VALUE);<a name="line.1749"></a>
+<span class="sourceLineNo">1750</span> }<a name="line.1750"></a>
+<span class="sourceLineNo">1751</span> }<a name="line.1751"></a>
+<span class="sourceLineNo">1752</span> } catch (Throwable t) {<a
name="line.1752"></a>
+<span class="sourceLineNo">1753</span> LOG.warn("Worker terminating
UNNATURALLY " + this.activeProcedure, t);<a name="line.1753"></a>
+<span class="sourceLineNo">1754</span> } finally {<a name="line.1754"></a>
+<span class="sourceLineNo">1755</span> LOG.debug("Worker
terminated.");<a name="line.1755"></a>
+<span class="sourceLineNo">1756</span> }<a name="line.1756"></a>
+<span class="sourceLineNo">1757</span> workerThreads.remove(this);<a
name="line.1757"></a>
+<span class="sourceLineNo">1758</span> }<a name="line.1758"></a>
+<span class="sourceLineNo">1759</span><a name="line.1759"></a>
+<span class="sourceLineNo">1760</span> @Override<a name="line.1760"></a>
+<span class="sourceLineNo">1761</span> public String toString() {<a
name="line.1761"></a>
+<span class="sourceLineNo">1762</span> Procedure<?> p =
this.activeProcedure;<a name="line.1762"></a>
+<span class="sourceLineNo">1763</span> return getName() + "(pid=" + (p ==
null? Procedure.NO_PROC_ID: p.getProcId() + ")");<a name="line.1763"></a>
+<span class="sourceLineNo">1764</span> }<a name="line.1764"></a>
+<span class="sourceLineNo">1765</span><a name="line.1765"></a>
+<span class="sourceLineNo">1766</span> /**<a name="line.1766"></a>
+<span class="sourceLineNo">1767</span> * @return the time since the
current procedure is running<a name="line.1767"></a>
+<span class="sourceLineNo">1768</span> */<a name="line.1768"></a>
+<span class="sourceLineNo">1769</span> public long getCurrentRunTime() {<a
name="line.1769"></a>
+<span class="sourceLineNo">1770</span> return
EnvironmentEdgeManager.currentTime() - executionStartTime.get();<a
name="line.1770"></a>
+<span class="sourceLineNo">1771</span> }<a name="line.1771"></a>
+<span class="sourceLineNo">1772</span><a name="line.1772"></a>
+<span class="sourceLineNo">1773</span> private boolean keepAlive(final long
lastUpdate) {<a name="line.1773"></a>
+<span class="sourceLineNo">1774</span> if (workerThreads.size() <=
corePoolSize) return true;<a name="line.1774"></a>
+<span class="sourceLineNo">1775</span> return
(EnvironmentEdgeManager.currentTime() - lastUpdate) < keepAliveTime;<a
name="line.1775"></a>
+<span class="sourceLineNo">1776</span> }<a name="line.1776"></a>
+<span class="sourceLineNo">1777</span> }<a name="line.1777"></a>
+<span class="sourceLineNo">1778</span><a name="line.1778"></a>
+<span class="sourceLineNo">1779</span> /**<a name="line.1779"></a>
+<span class="sourceLineNo">1780</span> * Runs task on a period such as check
for stuck workers.<a name="line.1780"></a>
+<span class="sourceLineNo">1781</span> * @see InlineChore<a
name="line.1781"></a>
+<span class="sourceLineNo">1782</span> */<a name="line.1782"></a>
+<span class="sourceLineNo">1783</span> private final class
TimeoutExecutorThread extends StoppableThread {<a name="line.1783"></a>
+<span class="sourceLineNo">1784</span> private final
DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();<a
name="line.1784"></a>
+<span class="sourceLineNo">1785</span><a name="line.1785"></a>
+<span class="sourceLineNo">1786</span> public TimeoutExecutorThread(final
ThreadGroup group) {<a name="line.1786"></a>
+<span class="sourceLineNo">1787</span> super(group, "ProcExecTimeout");<a
name="line.1787"></a>
+<span class="sourceLineNo">1788</span> setDaemon(true);<a
name="line.1788"></a>
+<span class="sourceLineNo">1789</span> }<a name="line.1789"></a>
+<span class="sourceLineNo">1790</span><a name="line.1790"></a>
+<span class="sourceLineNo">1791</span> @Override<a name="line.1791"></a>
+<span class="sourceLineNo">1792</span> public void sendStopSignal() {<a
name="line.1792"></a>
+<span class="sourceLineNo">1793</span>
queue.add(DelayedUtil.DELAYED_POISON);<a name="line.1793"></a>
+<span class="sourceLineNo">1794</span> }<a name="line.1794"></a>
+<span class="sourceLineNo">1795</span><a name="line.1795"></a>
+<span class="sourceLineNo">1796</span> @Override<a name="line.1796"></a>
+<span class="sourceLineNo">1797</span> public void run() {<a
name="line.1797"></a>
+<span class="sourceLineNo">1798</span> final boolean traceEnabled =
LOG.isTraceEnabled();<a name="line.1798"></a>
+<span class="sourceLineNo">1799</span> while (isRunning()) {<a
name="line.1799"></a>
+<span class="sourceLineNo">1800</span> final DelayedWithTimeout task =
DelayedUtil.takeWithoutInterrupt(queue);<a name="line.1800"></a>
+<span class="sourceLineNo">1801</span> if (task == null || task ==
DelayedUtil.DELAYED_POISON) {<a name="line.1801"></a>
+<span class="sourceLineNo">1802</span> // the executor may be
shutting down,<a name="line.1802"></a>
+<span class="sourceLineNo">1803</span> // and the task is just the
shutdown request<a name="line.1803"></a>
+<span class="sourceLineNo">1804</span> continue;<a
name="line.1804"></a>
+<span class="sourceLineNo">1805</span> }<a name="line.1805"></a>
+<span class="sourceLineNo">1806</span><a name="line.1806"></a>
+<span class="sourceLineNo">1807</span> if (traceEnabled) {<a
name="line.1807"></a>
+<span class="sourceLineNo">1808</span> LOG.trace("Executing " +
task);<a name="line.1808"></a>
+<span class="sourceLineNo">1809</span> }<a name="line.1809"></a>
+<span class="sourceLineNo">1810</span><a name="line.1810"></a>
+<span class="sourceLineNo">1811</span> // execute the task<a
name="line.1811"></a>
+<span class="sourceLineNo">1812</span> if (task instanceof InlineChore)
{<a name="line.1812"></a>
+<span class="sourceLineNo">1813</span>
execInlineChore((InlineChore)task);<a name="line.1813"></a>
+<span class="sourceLineNo">1814</span> } else if (task instanceof
DelayedProcedure) {<a name="line.1814"></a>
+<span class="sourceLineNo">1815</span>
execDelayedProcedure((DelayedProcedure)task);<a name="line.1815"></a>
+<span class="sourceLineNo">1816</span> } else {<a name="line.1816"></a>
+<span class="sourceLineNo">1817</span> LOG.error("CODE-BUG unknown
timeout task type " + task);<a name="line.1817"></a>
+<span class="sourceLineNo">1818</span> }<a name="line.1818"></a>
+<span class="sourceLineNo">1819</span> }<a name="line.1819"></a>
+<span class="sourceLineNo">1820</span> }<a name="line.1820"></a>
+<span class="sourceLineNo">1821</span><a name="line.1821"></a>
+<span class="sourceLineNo">1822</span> public void add(final InlineChore
chore) {<a name="line.1822"></a>
+<span class="sourceLineNo">1823</span> chore.refreshTimeout();<a
name="line.1823"></a>
+<span class="sourceLineNo">1824</span> queue.add(chore);<a
name="line.1824"></a>
+<span class="sourceLineNo">1825</span> }<a name="line.1825"></a>
+<span class="sourceLineNo">1826</span><a name="line.1826"></a>
+<span class="sourceLineNo">1827</span> public void add(final Procedure
procedure) {<a name="line.1827"></a>
+<span class="sourceLineNo">1828</span> assert procedure.getState() ==
ProcedureState.WAITING_TIMEOUT;<a name="line.1828"></a>
+<span class="sourceLineNo">1829</span> LOG.info("ADDED " + procedure + ";
timeout=" + procedure.getTimeout() +<a name="line.1829"></a>
+<span class="sourceLineNo">1830</span> ", timestamp=" +
procedure.getTimeoutTimestamp());<a name="line.1830"></a>
+<span class="sourceLineNo">1831</span> queue.add(new
DelayedProcedure(procedure));<a name="line.1831"></a>
+<span class="sourceLineNo">1832</span> }<a name="line.1832"></a>
+<span class="sourceLineNo">1833</span><a name="line.1833"></a>
+<span class="sourceLineNo">1834</span> public boolean remove(final
Procedure procedure) {<a name="line.1834"></a>
+<span class="sourceLineNo">1835</span> return queue.remove(new
DelayedProcedure(procedure));<a name="line.1835"></a>
+<span class="sourceLineNo">1836</span> }<a name="line.1836"></a>
+<span class="sourceLineNo">1837</span><a name="line.1837"></a>
+<span class="sourceLineNo">1838</span> private void execInlineChore(final
InlineChore chore) {<a name="line.1838"></a>
+<span class="sourceLineNo">1839</span> chore.run();<a
name="line.1839"></a>
+<span class="sourceLineNo">1840</span> add(chore);<a name="line.1840"></a>
+<span class="sourceLineNo">1841</span> }<a name="line.1841"></a>
+<span class="sourceLineNo">1842</span><a name="line.1842"></a>
+<span class="sourceLineNo">1843</span> private void
execDelayedProcedure(final DelayedProcedure delayed) {<a name="line.1843"></a>
+<span class="sourceLineNo">1844</span> // TODO: treat this as a normal
procedure, add it to the scheduler and<a name="line.1844"></a>
+<span class="sourceLineNo">1845</span> // let one of the workers handle
it.<a name="line.1845"></a>
+<span class="sourceLineNo">1846</span> // Today we consider
ProcedureInMemoryChore as InlineChores<a name="line.1846"></a>
+<span class="sourceLineNo">1847</span> final Procedure procedure =
delayed.getObject();<a name="line.1847"></a>
+<span class="sourceLineNo">1848</span> if (procedure instanceof
ProcedureInMemoryChore) {<a name="line.1848"></a>
+<span class="sourceLineNo">1849</span>
executeInMemoryChore((ProcedureInMemoryChore)procedure);<a name="line.1849"></a>
+<span class="sourceLineNo">1850</span> // if the procedure is in a
waiting state again, put it back in the queue<a name="line.1850"></a>
+<span class="sourceLineNo">1851</span> procedure.updateTimestamp();<a
name="line.1851"></a>
+<span class="sourceLineNo">1852</span> if (procedure.isWaiting()) {<a
name="line.1852"></a>
+<span class="sourceLineNo">1853</span>
delayed.setTimeout(procedure.getTimeoutTimestamp());<a name="line.1853"></a>
+<span class="sourceLineNo">1854</span> queue.add(delayed);<a
name="line.1854"></a>
+<span class="sourceLineNo">1855</span> }<a name="line.1855"></a>
+<span class="sourceLineNo">1856</span> } else {<a name="line.1856"></a>
+<span class="sourceLineNo">1857</span>
executeTimedoutProcedure(procedure);<a name="line.1857"></a>
+<span class="sourceLineNo">1858</span> }<a name="line.1858"></a>
+<span class="sourceLineNo">1859</span> }<a name="line.1859"></a>
+<span class="sourceLineNo">1860</span><a name="line.1860"></a>
+<span class="sourceLineNo">1861</span> private void
executeInMemoryChore(final ProcedureInMemoryChore chore) {<a
name="line.1861"></a>
+<span class="sourceLineNo">1862</span> if (!chore.isWaiting()) return;<a
name="line.1862"></a>
+<span class="sourceLineNo">1863</span><a name="line.1863"></a>
+<span class="sourceLineNo">1864</span> // The ProcedureInMemoryChore is a
special case, and it acts as a chore.<a name="line.1864"></a>
+<span class="sourceLineNo">1865</span> // instead of bringing the Chore
class in, we reuse this timeout thread for<a name="line.1865"></a>
+<span class="sourceLineNo">1866</span> // this special case.<a
name="line.1866"></a>
+<span class="sourceLineNo">1867</span> try {<a name="line.1867"></a>
+<span class="sourceLineNo">1868</span>
chore.periodicExecute(getEnvironment());<a name="line.1868"></a>
+<span class="sourceLineNo">1869</span> } catch (Throwable e) {<a
name="line.1869"></a>
+<span class="sourceLineNo">1870</span> LOG.error("Ignoring " + chore +
" exception: " + e.getMessage(), e);<a name="line.1870"></a>
+<span class="sourceLineNo">1871</span> }<a name="line.1871"></a>
+<span class="sourceLineNo">1872</span> }<a name="line.1872"></a>
+<span class="sourceLineNo">1873</span><a name="line.1873"></a>
+<span class="sourceLineNo">1874</span> private void
executeTimedoutProcedure(final Procedure proc) {<a name="line.1874"></a>
+<span class="sourceLineNo">1875</span> // The procedure received a
timeout. if the procedure itself does not handle it,<a name="line.1875"></a>
+<span class="sourceLineNo">1876</span> // call abort() and add the
procedure back in the queue for rollback.<a name="line.1876"></a>
+<span class="sourceLineNo">1877</span> if
(proc.setTimeoutFailure(getEnvironment())) {<a name="line.1877"></a>
+<span class="sourceLineNo">1878</span> long rootProcId =
Procedure.getRootProcedureId(procedures, proc);<a name="line.1878"></a>
+<span class="sourceLineNo">1879</span> RootProcedureState procStack =
rollbackStack.get(rootProcId);<a name="line.1879"></a>
+<span class="sourceLineNo">1880</span> procStack.abort();<a
name="line.1880"></a>
+<span class="sourceLineNo">1881</span> store.update(proc);<a
name="line.1881"></a>
+<span class="sourceLineNo">1882</span> scheduler.addFront(proc);<a
name="line.1882"></a>
+<span class="sourceLineNo">1883</span> }<a name="line.1883"></a>
+<span class="sourceLineNo">1884</span> }<a name="line.1884"></a>
+<span class="sourceLineNo">1885</span> }<a name="line.1885"></a>
+<span class="sourceLineNo">1886</span><a name="line.1886"></a>
+<span class="sourceLineNo">1887</span> private static final class
DelayedProcedure<a name="line.1887"></a>
+<span class="sourceLineNo">1888</span> extends
DelayedUtil.DelayedContainerWithTimestamp<Procedure> {<a
name="line.1888"></a>
+<span class="sourceLineNo">1889</span> public DelayedProcedure(final
Procedure procedure) {<a name="line.1889"></a>
+<span class="sourceLineNo">1890</span> super(procedure,
procedure.getTimeoutTimestamp());<a name="line.1890"></a>
+<span class="sourceLineNo">1891</span> }<a name="line.1891"></a>
+<span class="sourceLineNo">1892</span> }<a name="line.1892"></a>
+<span class="sourceLineNo">1893</span><a name="line.1893"></a>
+<span class="sourceLineNo">1894</span> private static abstract class
StoppableThread extends Thread {<a name="line.1894"></a>
+<span class="sourceLineNo">1895</span> public StoppableThread(final
ThreadGroup group, final String name) {<a name="line.1895"></a>
+<span class="sourceLineNo">1896</span> super(group, name);<a
name="line.1896"></a>
+<span class="sourceLineNo">1897</span> }<a name="line.1897"></a>
<span class="sourceLineNo">1898</span><a name="line.1898"></a>
-<span class="sourceLineNo">1899</span> public void awaitTermination() {<a
name="line.1899"></a>
-<span class="sourceLineNo">1900</span> try {<a name="line.1900"></a>
-<span class="sourceLineNo">1901</span> final long startTime =
EnvironmentEdgeManager.currentTime();<a name="line.1901"></a>
-<span class="sourceLineNo">1902</span> for (int i = 0; isAlive(); ++i)
{<a name="line.1902"></a>
-<span class="sourceLineNo">1903</span> sendStopSignal();<a
name="line.1903"></a>
-<span class="sourceLineNo">1904</span> join(250);<a
name="line.1904"></a>
-<span class="sourceLineNo">1905</span> if (i > 0 && (i %
8) == 0) {<a name="line.1905"></a>
-<span class="sourceLineNo">1906</span> LOG.warn("Waiting
termination of thread " + getName() + ", " +<a name="line.1906"></a>
-<span class="sourceLineNo">1907</span>
StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));<a
name="line.1907"></a>
-<span class="sourceLineNo">1908</span> }<a name="line.1908"></a>
-<span class="sourceLineNo">1909</span> }<a name="line.1909"></a>
-<span class="sourceLineNo">1910</span> } catch (InterruptedException e)
{<a name="line.1910"></a>
-<span class="sourceLineNo">1911</span> LOG.warn(getName() + " join wait
got interrupted", e);<a name="line.1911"></a>
-<span class="sourceLineNo">1912</span> }<a name="line.1912"></a>
-<span class="sourceLineNo">1913</span> }<a name="line.1913"></a>
-<span class="sourceLineNo">1914</span> }<a name="line.1914"></a>
-<span class="sourceLineNo">1915</span><a name="line.1915"></a>
-<span class="sourceLineNo">1916</span> //
==========================================================================<a
name="line.1916"></a>
-<span class="sourceLineNo">1917</span> // Inline Chores (executors internal
chores)<a name="line.1917"></a>
+<span class="sourceLineNo">1899</span> public abstract void
sendStopSignal();<a name="line.1899"></a>
+<span class="sourceLineNo">1900</span><a name="line.1900"></a>
+<span class="sourceLineNo">1901</span> public void awaitTermination() {<a
name="line.1901"></a>
+<span class="sourceLineNo">1902</span> try {<a name="line.1902"></a>
+<span class="sourceLineNo">1903</span> final long startTime =
EnvironmentEdgeManager.currentTime();<a name="line.1903"></a>
+<span class="sourceLineNo">1904</span> for (int i = 0; isAlive(); ++i)
{<a name="line.1904"></a>
+<span class="sourceLineNo">1905</span> sendStopSignal();<a
name="line.1905"></a>
+<span class="sourceLineNo">1906</span> join(250);<a
name="line.1906"></a>
+<span class="sourceLineNo">1907</span> if (i > 0 && (i %
8) == 0) {<a name="line.1907"></a>
+<span class="sourceLineNo">1908</span> LOG.warn("Waiting
termination of thread " + getName() + ", " +<a name="line.1908"></a>
+<span class="sourceLineNo">1909</span>
StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));<a
name="line.1909"></a>
+<span class="sourceLineNo">1910</span> }<a name="line.1910"></a>
+<span class="sourceLineNo">1911</span> }<a name="line.1911"></a>
+<span class="sourceLineNo">1912</span> } catch (InterruptedException e)
{<a name="line.1912"></a>
+<span class="sourceLineNo">1913</span> LOG.warn(getName() + " join wait
got interrupted", e);<a name="line.1913"></a>
+<span class="sourceLineNo">1914</span> }<a name="line.1914"></a>
+<span class="sourceLineNo">1915</span> }<a name="line.1915"></a>
+<span class="sourceLineNo">1916</span> }<a name="line.1916"></a>
+<span class="sourceLineNo">1917</span><a name="line.1917"></a>
<span class="sourceLineNo">1918</span> //
==========================================================================<a
name="line.1918"></a>
-<span class="sourceLineNo">1919</span> private static abstract class
InlineChore extends DelayedUtil.DelayedObject implements Runnable {<a
name="line.1919"></a>
-<span class="sourceLineNo">1920</span> private long timeout;<a
name="line.1920"></a>
-<span class="sourceLineNo">1921</span><a name="line.1921"></a>
-<span class="sourceLineNo">1922</span> public abstract int
getTimeoutInterval();<a name="line.1922"></a>
+<span class="sourceLineNo">1919</span> // Inline Chores (executors internal
chores)<a name="line.1919"></a>
+<span class="sourceLineNo">1920</span> //
==========================================================================<a
name="line.1920"></a>
+<span class="sourceLineNo">1921</span> private static abstract class
InlineChore extends DelayedUtil.DelayedObject implements Runnable {<a
name="line.1921"></a>
+<span class="sourceLineNo">1922</span> private long timeout;<a
name="line.1922"></a>
<span class="sourceLineNo">1923</span><a name="line.1923"></a>
-<span class="sourceLineNo">1924</span> protected void refreshTimeout() {<a
name="line.1924"></a>
-<span class="sourceLineNo">1925</span> this.timeout =
EnvironmentEdgeManager.currentTime() + getTimeoutInterval();<a
name="line.1925"></a>
-<span class="sourceLineNo">1926</span> }<a name="line.1926"></a>
-<span class="sourceLineNo">1927</span><a name="line.1927"></a>
-<span class="sourceLineNo">1928</span> @Override<a name="line.1928"></a>
-<span class="sourceLineNo">1929</span> public long getTimeout() {<a
name="line.1929"></a>
-<span class="sourceLineNo">1930</span> return timeout;<a
name="line.1930"></a>
-<span class="sourceLineNo">1931</span> }<a name="line.1931"></a>
-<span class="sourceLineNo">1932</span> }<a name="line.1932"></a>
-<span class="sourceLineNo">1933</span><a name="line.1933"></a>
-<span class="sourceLineNo">1934</span> //
----------------------------------------------------------------------------<a
name="line.1934"></a>
-<span class="sourceLineNo">1935</span> // TODO-MAYBE: Should we provide a
InlineChore to notify the store with the<a name="line.1935"></a>
-<span class="sourceLineNo">1936</span> // full set of procedures pending and
completed to write a compacted<a name="line.1936"></a>
-<span class="sourceLineNo">1937</span> // version of the log (in case is a
log)?<a name="line.1937"></a>
-<span class="sourceLineNo">1938</span> // In theory no, procedures are have a
short life, so at some point the store<a name="line.1938"></a>
-<span class="sourceLineNo">1939</span> // will have the tracker saying
everything is in the last log.<a name="line.1939"></a>
-<span class="sourceLineNo">1940</span> //
----------------------------------------------------------------------------<a
name="line.1940"></a>
-<span class="sourceLineNo">1941</span><a name="line.1941"></a>
-<span class="sourceLineNo">1942</span> private final class WorkerMonitor
extends InlineChore {<a name="line.1942"></a>
-<span class="sourceLineNo">1943</span> public static final String
WORKER_MONITOR_INTERVAL_CONF_KEY =<a name="line.1943"></a>
-<span class="sourceLineNo">1944</span>
"hbase.procedure.worker.monitor.interval.msec";<a name="line.1944"></a>
-<span class="sourceLineNo">1945</span> private static final int
DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec<a name="line.1945"></a>
-<span class="sourceLineNo">1946</span><a name="line.1946"></a>
-<span class="sourceLineNo">1947</span> public static final String
WORKER_STUCK_THRESHOLD_CONF_KEY =<a name="line.1947"></a>
-<span class="sourceLineNo">1948</span>
"hbase.procedure.worker.stuck.threshold.msec";<a name="line.1948"></a>
-<span class="sourceLineNo">1949</span> private static final int
DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec<a name="line.1949"></a>
-<span class="sourceLineNo">1950</span><a name="line.1950"></a>
-<span class="sourceLineNo">1951</span> public static final String
WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =<a name="line.1951"></a>
-<span class="sourceLineNo">1952</span>
"hbase.procedure.worker.add.stuck.percentage";<a name="line.1952"></a>
-<span class="sourceLineNo">1953</span> private static final float
DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck<a name="line.1953"></a>
-<span class="sourceLineNo">1954</span><a name="line.1954"></a>
-<span class="sourceLineNo">1955</span> private float
addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;<a
name="line.1955"></a>
-<span class="sourceLineNo">1956</span> private int timeoutInterval =
DEFAULT_WORKER_MONITOR_INTERVAL;<a name="line.1956"></a>
-<span class="sourceLineNo">1957</span> private int stuckThreshold =
DEFAULT_WORKER_STUCK_THRESHOLD;<a name="line.1957"></a>
-<span class="sourceLineNo">1958</span><a name="line.1958"></a>
-<span class="sourceLineNo">1959</span> public WorkerMonitor() {<a
name="line.1959"></a>
-<span class="sourceLineNo">1960</span> refreshConfig();<a
name="line.1960"></a>
-<span class="sourceLineNo">1961</span> }<a name="line.1961"></a>
-<span class="sourceLineNo">1962</span><a name="line.1962"></a>
-<span class="sourceLineNo">1963</span> @Override<a name="line.1963"></a>
-<span class="sourceLineNo">1964</span> public void run() {<a
name="line.1964"></a>
-<span class="sourceLineNo">1965</span> final int stuckCount =
checkForStuckWorkers();<a name="line.1965"></a>
-<span class="sourceLineNo">1966</span> checkThreadCount(stuckCount);<a
name="line.1966"></a>
-<span class="sourceLineNo">1967</span><a name="line.1967"></a>
-<span class="sourceLineNo">1968</span> // refresh interval (poor man
dynamic conf update)<a name="line.1968"></a>
-<span class="sourceLineNo">1969</span> refreshConfig();<a
name="line.1969"></a>
-<span class="sourceLineNo">1970</span> }<a name="line.1970"></a>
-<span class="sourceLineNo">1971</span><a name="line.1971"></a>
-<span class="sourceLineNo">1972</span> private int checkForStuckWorkers()
{<a name="line.1972"></a>
-<span class="sourceLineNo">1973</span> // check if any of the worker is
stuck<a name="line.1973"></a>
-<span class="sourceLineNo">1974</span> int stuckCount = 0;<a
name="line.1974"></a>
-<span class="sourceLineNo">1975</span> for (WorkerThread worker:
workerThreads) {<a name="line.1975"></a>
-<span class="sourceLineNo">1976</span> if (worker.getCurrentRunTime()
< stuckThreshold) {<a name="line.1976"></a>
-<span class="sourceLineNo">1977</span> continue;<a
name="line.1977"></a>
-<span class="sourceLineNo">1978</span> }<a name="line.1978"></a>
-<span class="sourceLineNo">1979</span><a name="line.1979"></a>
-<span class="sourceLineNo">1980</span> // WARN the worker is stuck<a
name="line.1980"></a>
-<span class="sourceLineNo">1981</span> stuckCount++;<a
name="line.1981"></a>
-<span class="sourceLineNo">1982</span> LOG.warn("Worker stuck " +
worker +<a name="line.1982"></a>
-<span class="sourceLineNo">1983</span> " run time " +
StringUtils.humanTimeDiff(worker.getCurrentRunTime()));<a name="line.1983"></a>
-<span class="sourceLineNo">1984</span> }<a name="line.1984"></a>
-<span class="sourceLineNo">1985</span> return stuckCount;<a
name="line.1985"></a>
-<span class="sourceLineNo">1986</span> }<a name="line.1986"></a>
-<span class="sourceLineNo">1987</span><a name="line.1987"></a>
-<span class="sourceLineNo">1988</span> private void checkThreadCount(final
int stuckCount) {<a name="line.1988"></a>
-<span class="sourceLineNo">1989</span> // nothing to do if there are no
runnable tasks<a name="line.1989"></a>
-<span class="sourceLineNo">1990</span> if (stuckCount < 1 ||
!scheduler.hasRunnables()) return;<a name="line.1990"></a>
-<span class="sourceLineNo">1991</span><a name="line.1991"></a>
-<span class="sourceLineNo">1992</span> // add a new thread if the worker
stuck percentage exceed the threshold limit<a name="line.1992"></a>
-<span class="sourceLineNo">1993</span> // and every handler is active.<a
name="line.1993"></a>
-<span class="sourceLineNo">1994</span> final float stuckPerc =
((float)stuckCount) / workerThreads.size();<a name="line.1994"></a>
-<span class="sourceLineNo">1995</span> if (stuckPerc >=
addWorkerStuckPercentage &&<a name="line.1995"></a>
-<span class="sourceLineNo">1996</span> activeExecutorCount.get() ==
workerThreads.size()) {<a name="line.1996"></a>
-<span class="sourceLineNo">1997</span> final WorkerThread worker = new
WorkerThread(threadGroup);<a name="line.1997"></a>
-<span class="sourceLineNo">1998</span> workerThreads.add(worker);<a
name="line.1998"></a>
-<span class="sourceLineNo">1999</span> worker.start();<a
name="line.1999"></a>
-<span class="sourceLineNo">2000</span> LOG.debug("Added new worker
thread " + worker);<a name="line.2000"></a>
-<span class="sourceLineNo">2001</span> }<a name="line.2001"></a>
-<span class="sourceLineNo">2002</span> }<a name="line.2002"></a>
-<span class="sourceLineNo">2003</span><a name="line.2003"></a>
-<span class="sourceLineNo">2004</span> private void refreshConfig() {<a
name="line.2004"></a>
-<span class="sourceLineNo">2005</span> addWorkerStuckPercentage =
conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,<a name="line.2005"></a>
-<span class="sourceLineNo">2006</span>
DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);<a name="line.2006"></a>
-<span class="sourceLineNo">2007</span> timeoutInterval =
conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,<a name="line.2007"></a>
-<span class="sourceLineNo">2008</span>
DEFAULT_WORKER_MONITOR_INTERVAL);<a name="line.2008"></a>
-<span class="sourceLineNo">2009</span> stuckThreshold =
conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,<a name="line.2009"></a>
-<span class="sourceLineNo">2010</span>
DEFAULT_WORKER_STUCK_THRESHOLD);<a name="line.2010"></a>
-<span class="sourceLineNo">2011</span> }<a name="line.2011"></a>
-<span class="sourceLineNo">2012</span><a name="line.2012"></a>
-<span class="sourceLineNo">2013</span> @Override<a name="line.2013"></a>
-<span class="sourceLineNo">2014</span> public int getTimeoutInterval() {<a
name="line.2014"></a>
-<span class="sourceLineNo">2015</span> return timeoutInterval;<a
name="line.2015"></a>
-<span class="sourceLineNo">2016</span> }<a name="line.2016"></a>
-<span class="sourceLineNo">2017</span> }<a name="line.2017"></a>
-<span class="sourceLineNo">2018</span>}<a name="line.2018"></a>
+<span class="sourceLineNo">1924</span> public abstract int
getTimeoutInterval();<a name="line.1924"></a>
+<span class="sourceLineNo">1925</span><a name="line.1925"></a>
+<span class="sourceLineNo">1926</span> protected void refreshTimeout() {<a
name="line.1926"></a>
+<span class="sourceLineNo">1927</span> this.timeout =
EnvironmentEdgeManager.currentTime() + getTimeoutInterval();<a
name="line.1927"></a>
+<span class="sourceLineNo">1928</span> }<a name="line.1928"></a>
+<span class="sourceLineNo">1929</span><a name="line.1929"></a>
+<span class="sourceLineNo">1930</span> @Override<a name="line.1930"></a>
+<span class="sourceLineNo">1931</span> public long getTimeout() {<a
name="line.1931"></a>
+<span class="sourceLineNo">1932</span> return timeout;<a
name="line.1932"></a>
+<span class="sourceLineNo">1933</span> }<a name="line.1933"></a>
+<span class="sourceLineNo">1934</span> }<a name="line.1934"></a>
+<span class="sourceLineNo">1935</span><a name="line.1935"></a>
+<span class="sourceLineNo">1936</span> //
----------------------------------------------------------------------------<a
name="line.1936"></a>
+<span class="sourceLineNo">1937</span> // TODO-MAYBE: Should we provide a
InlineChore to notify the store with the<a name="line.1937"></a>
+<span class="sourceLineNo">1938</span> // full set of procedures pending and
completed to write a compacted<a name="line.1938"></a>
+<span class="sourceLineNo">1939</span> // version of the log (in case is a
log)?<a name="line.1939"></a>
+<span class="sourceLineNo">1940</span> // In theory no, procedures are have a
short life, so at some point the store<a name="line.1940"></a>
+<span class="sourceLineNo">1941</span> // will have the tracker saying
everything is in the last log.<a name="line.1941"></a>
+<span class="sourceLineNo">1942</span> //
----------------------------------------------------------------------------<a
name="line.1942"></a>
+<span class="sourceLineNo">1943</span><a name="line.1943"></a>
+<span class="sourceLineNo">1944</span> private final class WorkerMonitor
extends InlineChore {<a name="line.1944"></a>
+<span class="sourceLineNo">1945</span> public static final String
WORKER_MONITOR_INTERVAL_CONF_KEY =<a name="line.1945"></a>
+<span class="sourceLineNo">1946</span>
"hbase.procedure.worker.monitor.interval.msec";<a name="line.1946"></a>
+<span class="sourceLineNo">1947</span> private static final int
DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec<a name="line.1947"></a>
+<span class="sourceLineNo">1948</span><a name="line.1948"></a>
+<span class="sourceLineNo">1949</span> public static final String
WORKER_STUCK_THRESHOLD_CONF_KEY =<a name="line.1949"></a>
+<span class="sourceLineNo">1950</span>
"hbase.procedure.worker.stuck.threshold.msec";<a name="line.1950"></a>
+<span class="sourceLineNo">1951</span> private static final int
DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec<a name="line.1951"></a>
+<span class="sourceLineNo">1952</span><a name="line.1952"></a>
+<span class="sourceLineNo">1953</span> public static final String
WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =<a name="line.1953"></a>
+<span class="sourceLineNo">1954</span>
"hbase.procedure.worker.add.stuck.percentage";<a name="line.1954"></a>
+<span class="sourceLineNo">1955</span> private static final float
DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck<a name="line.1955"></a>
+<span class="sourceLineNo">1956</span><a name="line.1956"></a>
+<span class="sourceLineNo">1957</span> private float
addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;<a
name="line.1957"></a>
+<span class="sourceLineNo">1958</span> private int timeoutInterval =
DEFAULT_WORKER_MONITOR_INTERVAL;<a name="line.1958"></a>
+<span class="sourceLineNo">1959</span> private int stuckThreshold =
DEFAULT_WORKER_STUCK_THRESHOLD;<a name="line.1959"></a>
+<span class="sourceLineNo">1960</span><a name="line.1960"></a>
+<span class="sourceLineNo">1961</span> public WorkerMonitor() {<a
name="line.1961"></a>
+<span class="sourceLineNo">1962</span> refreshConfig();<a
name="line.1962"></a>
+<span class="sourceLineNo">1963</span> }<a name="line.1963"></a>
+<span class="sourceLineNo">1964</span><a name="line.1964"></a>
+<span class="sourceLineNo">1965</span> @Override<a name="line.1965"></a>
+<span class="sourceLineNo">1966</span> public void run() {<a
name="line.1966"></a>
+<span class="sourceLineNo">1967</span> final int stuckCount =
checkForStuckWorkers();<a name="line.1967"></a>
+<span class="sourceLineNo">1968</span> checkThreadCount(stuckCount);<a
name="line.1968"></a>
+<span class="sourceLineNo">1969</span><a name="line.1969"></a>
+<span class="sourceLineNo">1970</span> // refresh interval (poor man
dynamic conf update)<a name="line.1970"></a>
+<span class="sourceLineNo">1971</span> refreshConfig();<a
name="line.1971"></a>
+<span class="sourceLineNo">1972</span> }<a name="line.1972"></a>
+<span class="sourceLineNo">1973</span><a name="line.1973"></a>
+<span class="sourceLineNo">1974</span> private int checkForStuckWorkers()
{<a name="line.1974"></a>
+<span class="sourceLineNo">1975</span> // check if any of the worker is
stuck<a name="line.1975"></a>
+<span class="sourceLineNo">1976</span> int stuckCount = 0;<a
name="line.1976"></a>
+<span class="sourceLineNo">1977</span> for (WorkerThread worker:
workerThreads) {<a name="line.1977"></a>
+<span class="sourceLineNo">1978</span> if (worker.getCurrentRunTime()
< stuckThreshold) {<a name="line.1978"></a>
+<span class="sourceLineNo">1979</span> continue;<a
name="line.1979"></a>
+<span class="sourceLineNo">1980</span> }<a name="line.1980"></a>
+<span class="sourceLineNo">1981</span><a name="line.1981"></a>
+<span class="sourceLineNo">1982</span> // WARN the worker is stuck<a
name="line.1982"></a>
+<span class="sourceLineNo">1983</span> stuckCount++;<a
name="line.1983"></a>
+<span class="sourceLineNo">1984</span> LOG.warn("Worker stuck " +
worker +<a name="line.1984"></a>
+<span class="sourceLineNo">1985</span> " run time " +
StringUtils.humanTimeDiff(worker.getCurrentRunTime()));<a name="line.1985"></a>
+<span class="sourceLineNo">1986</span> }<a name="line.1986"></a>
+<span class="sourceLineNo">1987</span> return stuckCount;<a
name="line.1987"></a>
+<span class="sourceLineNo">1988</span> }<a name="line.1988"></a>
+<span class="sourceLineNo">1989</span><a name="line.1989"></a>
+<span class="sourceLineNo">1990</span> private void checkThreadCount(final
int stuckCount) {<a name="line.1990"></a>
+<span class="sourceLineNo">1991</span> // nothing to do if there are no
runnable tasks<a name="line.1991"></a>
+<span class="sourceLineNo">1992</span> if (stuckCount < 1 ||
!scheduler.hasRunnables()) return;<a name="line.1992"></a>
+<span class="sourceLineNo">1993</span><a name="line.1993"></a>
+<span class="sourceLineNo">1994</span> // add a new thread if the worker
stuck percentage exceed the threshold limit<a name="line.1994"></a>
+<span class="sourceLineNo">1995</span> // and every handler is active.<a
name="line.1995"></a>
+<span class="sourceLineNo">1996</span> final float stuckPerc =
((float)stuckCount) / workerThreads.size();<a name="line.1996"></a>
+<span class="sourceLineNo">1997</span> if (stuckPerc >=
addWorkerStuckPercentage &&<a name="line.1997"></a>
+<span class="sourceLineNo">1998</span> activeExecutorCount.get() ==
workerThreads.size()) {<a name="line.1998"></a>
+<span class="sourceLineNo">1999</span> final WorkerThread worker = new
WorkerThread(threadGroup);<a name="line.1999"></a>
+<span class="sourceLineNo">2000</span> workerThreads.add(worker);<a
name="line.2000"></a>
+<span class="sourceLineNo">2001</span> worker.start();<a
name="line.2001"></a>
+<span class="sourceLineNo">2002</span> LOG.debug("Added new worker
thread " + worker);<a name="line.2002"></a>
+<span class="sourceLineNo">2003</span> }<a name="line.2003"></a>
+<span class="sourceLineNo">2004</span> }<a name="line.2004"></a>
+<span class="sourceLineNo">2005</span><a name="line.2005"></a>
+<span class="sourceLineNo">2006</span> private void refreshConfig() {<a
name="line.2006"></a>
+<span class="sourceLineNo">2007</span> addWorkerStuckPercentage =
conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,<a name="line.2007"></a>
+<span class="sourceLineNo">2008</span>
DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);<a name="line.2008"></a>
+<span class="sourceLineNo">2009</span> timeoutInterval =
conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,<a name="line.2009"></a>
+<span class="sourceLineNo">2010</span>
DEFAULT_WORKER_MONITOR_INTERVAL);<a name="line.2010"></a>
+<span class="sourceLineNo">2011</span> stuckThreshold =
conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,<a name="line.2011"></a>
+<span class="sourceLineNo">2012</span>
DEFAULT_WORKER_STUCK_THRESHOLD);<a name="line.2012"></a>
+<span class="sourceLineNo">2013</span> }<a name="line.2013"></a>
+<span class="sourceLineNo">2014</span><a name="line.2014"></a>
+<span class="sourceLineNo">2015</span> @Override<a name="line.2015"></a>
+<span class="sourceLineNo">2016</span> public int getTimeoutInterval() {<a
name="line.2016"></a>
+<span class="sourceLineNo">2017</span> return timeoutInterval;<a
name="line.2017"></a>
+<span class="sourceLineNo">2018</span> }<a name="line.2018"></a>
+<span class="sourceLineNo">2019</span> }<a name="line.2019"></a>
+<span class="sourceLineNo">2020</span>}<a name="line.2020"></a>