Author: cwiklik Date: Thu Jan 19 14:56:08 2017 New Revision: 1779475 URL: http://svn.apache.org/viewvc?rev=1779475&view=rev Log: UIMA-5157 synchronized stop and removed killer thread
Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java Modified: uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java?rev=1779475&r1=1779474&r2=1779475&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java (original) +++ uima/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java Thu Jan 19 14:56:08 2017 @@ -142,6 +142,8 @@ public class NodeAgent extends AbstractD private volatile boolean stopping = false; + private Object stopLock = new Object(); + private RogueProcessReaper rogueProcessReaper = new RogueProcessReaper(logger, 5, 10); public volatile boolean useCgroups = false; @@ -1799,68 +1801,75 @@ public class NodeAgent extends AbstractD } public void stop() throws Exception { - if (stopping) { - return; - } - stopping = true; - - // Send an empty process map as the final inventory - HashMap<DuccId, IDuccProcess> emptyMap = - new HashMap<DuccId, IDuccProcess>(); - DuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(emptyMap,getLastORSequence(), getIdentity()); - inventoryDispatcher.dispatch(duccEvent); - logger.info("stop", null, "Agent published final inventory"); - - configurationFactory.stopRoutes(); - - logger.info("stop", null, "Agent stopping managed processes"); - // Dispatch SIGTERM to all child processes - boolean wait = stopChildProcesses(); - - // Stop publishing inventory. Once the route is down the agent forces last publication - // sending an empty process map. - //configurationFactory.stopInventoryRoute(); - - if ( wait && deployedProcesses.size() > 0 ) { - logger.info("stop", null, "Agent Sent SIGTERM to ALL Child Processes - Number of Deployed Processes:"+deployedProcesses.size()); - // wait for awhile - synchronized (this) { - long waittime = 60000; - if (configurationFactory.processStopTimeout != null ) { - try { - waittime = Long.parseLong(configurationFactory.processStopTimeout); - } catch( NumberFormatException e) { - logger.warn("stop", null, e); - } - } - logger.info("stop", null, "Waiting", waittime, "ms to send final NodeInventory."); - wait(waittime); - } - } - - // send kill -9 to any child process still running - killChildProcesses(); - - // Self destruct thread in case we loose AMQ broker and AMQ listener gets into retry - // mode trying to recover a connection - Thread t = new Thread( new Runnable() { - public void run() { - try { - logger.info("stop", null, "Agent waiting for additional 10 seconds to allow for a clean shutdown before terminating itself via System.exit(1) "); - Thread.sleep(10000); - } catch(Exception e ) { - logger.info("stop", null, e); - } finally{ - logger.info("stop", null, "Agent calling System.exit(1) ... "); - System.exit(1); - } - } - }); - t.start(); - t.join(10000); - logger.info("stop", null, "Reaper thread finished - calling super.stop()"); + synchronized(stopLock) { + logger.info("stop", null, "Agent stop()"); + if (stopping) { + return; + } + stopping = true; + + // Send an empty process map as the final inventory + HashMap<DuccId, IDuccProcess> emptyMap = + new HashMap<DuccId, IDuccProcess>(); + DuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(emptyMap,getLastORSequence(), getIdentity()); + inventoryDispatcher.dispatch(duccEvent); + logger.info("stop", null, "Agent published final inventory"); + + configurationFactory.stopRoutes(); + + logger.info("stop", null, "Agent stopping managed processes"); + // Dispatch SIGTERM to all child processes + boolean wait = stopChildProcesses(); + + // Stop publishing inventory. Once the route is down the agent forces last publication + // sending an empty process map. + //configurationFactory.stopInventoryRoute(); + + if ( wait && deployedProcesses.size() > 0 ) { + logger.info("stop", null, "Agent Sent SIGTERM to ALL Child Processes - Number of Deployed Processes:"+deployedProcesses.size()); + // wait for awhile + synchronized (this) { + long waittime = 60000; + if (configurationFactory.processStopTimeout != null ) { + try { + waittime = Long.parseLong(configurationFactory.processStopTimeout); + } catch( NumberFormatException e) { + logger.warn("stop", null, e); + } + } + logger.info("stop", null, "Waiting", waittime, "ms to send final NodeInventory."); + wait(waittime); + } + } - super.stop(); + // send kill -9 to any child process still running + killChildProcesses(); +/* + // Self destruct thread in case we loose AMQ broker and AMQ listener gets into retry + // mode trying to recover a connection + Thread t = new Thread( new Runnable() { + public void run() { + try { + logger.info("stop", null, "Agent waiting for additional 10 seconds to allow for a clean shutdown before terminating itself via System.exit(1) "); + Thread.sleep(10000); + } catch(Exception e ) { + logger.info("stop", null, e); + } finally{ + logger.info("stop", null, "Agent calling System.exit(1) ... "); + System.exit(1); + } + } + }); + t.start(); + t.join(10000); + super.stop(); + logger.info("stop", null, "Reaper thread finished - calling super.stop()"); + } +*/ + stopLock.wait(2000); + super.stop(); + logger.info("stop", null, "Reaper thread finished - calling super.stop()"); + } } public Future<?> getDeployedJPFuture(IDuccId duccId) {