Updated termination process
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6934c7a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6934c7a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6934c7a0 Branch: refs/heads/master Commit: 6934c7a0d4c479392c6c147d00af8cd9ed7b3c44 Parents: c837a2c Author: mfranklin <[email protected]> Authored: Thu May 1 18:45:30 2014 -0400 Committer: mfranklin <[email protected]> Committed: Thu May 1 18:45:30 2014 -0400 ---------------------------------------------------------------------- .../local/builders/LocalStreamBuilder.java | 161 ++++++++++++------- .../local/tasks/StreamsPersistWriterTask.java | 6 +- .../local/tasks/StreamsProcessorTask.java | 2 +- 3 files changed, 110 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6934c7a0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index 8e688ba..d313b3f 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -1,5 +1,6 @@ package org.apache.streams.local.builders; +import org.apache.log4j.spi.LoggerFactory; import org.apache.streams.core.*; import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread; import org.apache.streams.local.tasks.StatusCounterMonitorThread; @@ -7,6 +8,7 @@ import org.apache.streams.local.tasks.StreamsProviderTask; import org.apache.streams.local.tasks.StreamsTask; import org.apache.streams.util.SerializationUtil; import org.joda.time.DateTime; +import org.slf4j.Logger; import java.math.BigInteger; import java.util.*; @@ -22,6 +24,8 @@ import java.util.concurrent.TimeUnit; */ public class LocalStreamBuilder implements StreamBuilder { + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LocalStreamBuilder.class); + public static final String TIMEOUT_KEY = "TIMEOUT"; private Map<String, StreamComponent> providers; private Map<String, StreamComponent> components; @@ -32,6 +36,7 @@ public class LocalStreamBuilder implements StreamBuilder { private int totalTasks; private int monitorTasks; private LocalStreamProcessMonitorThread monitorThread; + private Map<String, List<StreamsTask>> tasks; /** * @@ -139,39 +144,18 @@ public class LocalStreamBuilder implements StreamBuilder { */ @Override public void start() { + attachShutdownHandler(); boolean isRunning = true; this.executor = Executors.newFixedThreadPool(this.totalTasks); this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1); Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>(); - Map<String, List<StreamsTask>> streamsTasks = new HashMap<String, List<StreamsTask>>(); + tasks = new HashMap<String, List<StreamsTask>>(); try { monitorThread = new LocalStreamProcessMonitorThread(executor, 10); this.monitor.submit(monitorThread); - for(StreamComponent comp : this.components.values()) { - int tasks = comp.getNumTasks(); - List<StreamsTask> compTasks = new LinkedList<StreamsTask>(); - for(int i=0; i < tasks; ++i) { - StreamsTask task = comp.createConnectedTask(getTimeout()); - task.setStreamConfig(this.streamConfig); - this.executor.submit(task); - compTasks.add(task); - if( comp.isOperationCountable() ) { - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); - } - } - streamsTasks.put(comp.getId(), compTasks); - } - for(StreamComponent prov : this.providers.values()) { - StreamsTask task = prov.createConnectedTask(getTimeout()); - task.setStreamConfig(this.streamConfig); - this.executor.submit(task); - provTasks.put(prov.getId(), (StreamsProviderTask) task); - if( prov.isOperationCountable() ) { - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10)); - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); - } - } + setupComponentTasks(tasks); + setupProviderTasks(provTasks); + LOGGER.info("Started stream with {} components", tasks.size()); while(isRunning) { isRunning = false; for(StreamsProviderTask task : provTasks.values()) { @@ -184,44 +168,98 @@ public class LocalStreamBuilder implements StreamBuilder { Thread.sleep(3000); } } - monitorThread.shutdown(); - this.executor.shutdown(); - //complete stream shut down gracfully - for(StreamComponent prov : this.providers.values()) { - shutDownTask(prov, streamsTasks); + LOGGER.debug("Components are no longer running or timed out due to completion"); + shutdown(tasks); + } catch (InterruptedException e){ + forceShutdown(tasks); + } + + } + + private void attachShutdownHandler() { + final LocalStreamBuilder self = this; + LOGGER.debug("Attaching shutdown handler"); + Runtime.getRuntime().addShutdownHook(new Thread(){ + @Override + public void run() { + LOGGER.debug("Shutdown hook received. Beginning shutdown"); + self.stop(); + } + }); + } + + protected void forceShutdown(Map<String, List<StreamsTask>> streamsTasks) { + LOGGER.debug("Shutdown failed. Forcing shutdown"); + //give the stream 30secs to try to shutdown gracefully, then force shutdown otherwise + for(List<StreamsTask> tasks : streamsTasks.values()) { + for(StreamsTask task : tasks) { + task.stopTask(); } - //need to make this configurable - if(!this.executor.awaitTermination(10, TimeUnit.SECONDS)) { // all threads should have terminated already. + } + this.executor.shutdown(); + this.monitor.shutdown(); + try { + if(!this.executor.awaitTermination(3, TimeUnit.SECONDS)){ this.executor.shutdownNow(); - this.executor.awaitTermination(10, TimeUnit.SECONDS); } - if(!this.monitor.awaitTermination(5, TimeUnit.SECONDS)) { // all threads should have terminated already. + if(!this.monitor.awaitTermination(3, TimeUnit.SECONDS)){ this.monitor.shutdownNow(); - this.monitor.awaitTermination(5, TimeUnit.SECONDS); } - } catch (InterruptedException e){ - //give the stream 30secs to try to shutdown gracefully, then force shutdown otherwise - for(List<StreamsTask> tasks : streamsTasks.values()) { - for(StreamsTask task : tasks) { - task.stopTask(); - } + }catch (InterruptedException ie) { + this.executor.shutdownNow(); + this.monitor.shutdownNow(); + throw new RuntimeException(ie); + } + } + + protected void shutdown(Map<String, List<StreamsTask>> streamsTasks) throws InterruptedException { + LOGGER.info("Attempting to shutdown tasks"); + monitorThread.shutdown(); + this.executor.shutdown(); + //complete stream shut down gracfully + for(StreamComponent prov : this.providers.values()) { + shutDownTask(prov, streamsTasks); + } + //need to make this configurable + if(!this.executor.awaitTermination(10, TimeUnit.SECONDS)) { // all threads should have terminated already. + this.executor.shutdownNow(); + this.executor.awaitTermination(10, TimeUnit.SECONDS); + } + if(!this.monitor.awaitTermination(5, TimeUnit.SECONDS)) { // all threads should have terminated already. + this.monitor.shutdownNow(); + this.monitor.awaitTermination(5, TimeUnit.SECONDS); + } + } + + protected void setupProviderTasks(Map<String, StreamsProviderTask> provTasks) { + for(StreamComponent prov : this.providers.values()) { + StreamsTask task = prov.createConnectedTask(getTimeout()); + task.setStreamConfig(this.streamConfig); + this.executor.submit(task); + provTasks.put(prov.getId(), (StreamsProviderTask) task); + if( prov.isOperationCountable() ) { + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10)); + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); } - this.executor.shutdown(); - this.monitor.shutdown(); - try { - if(!this.executor.awaitTermination(3, TimeUnit.SECONDS)){ - this.executor.shutdownNow(); - } - if(!this.monitor.awaitTermination(3, TimeUnit.SECONDS)){ - this.monitor.shutdownNow(); + } + } + + protected void setupComponentTasks(Map<String, List<StreamsTask>> streamsTasks) { + for(StreamComponent comp : this.components.values()) { + int tasks = comp.getNumTasks(); + List<StreamsTask> compTasks = new LinkedList<StreamsTask>(); + for(int i=0; i < tasks; ++i) { + StreamsTask task = comp.createConnectedTask(getTimeout()); + task.setStreamConfig(this.streamConfig); + this.executor.submit(task); + compTasks.add(task); + if( comp.isOperationCountable() ) { + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); } - }catch (InterruptedException ie) { - this.executor.shutdownNow(); - this.monitor.shutdownNow(); - throw new RuntimeException(ie); } + streamsTasks.put(comp.getId(), compTasks); } - } /** @@ -249,8 +287,13 @@ public class LocalStreamBuilder implements StreamBuilder { task.stopTask(); } for(StreamsTask task : tasks) { - while(task.isRunning()) { + int count = 0; + while(count < 20 && task.isRunning()) { Thread.sleep(500); + count++; + } + if(task.isRunning()) { + LOGGER.warn("Task {} failed to terminate in allotted timeframe", task.toString()); } } } @@ -268,7 +311,11 @@ public class LocalStreamBuilder implements StreamBuilder { */ @Override public void stop() { - + try { + shutdown(tasks); + } catch (Exception e) { + forceShutdown(tasks); + } } private void connectToOtherComponents(String[] conntectToIds, StreamComponent toBeConnected) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6934c7a0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java index 1eac1d9..8146bdd 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java @@ -72,12 +72,13 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt try { this.writer.prepare(this.streamConfig); StreamsDatum datum = this.inQueue.poll(); - while(datum != null || this.keepRunning.get()) { + while(this.keepRunning.get()) { if(datum != null) { try { this.writer.write(datum); statusCounter.incrementStatus(DatumStatus.SUCCESS); } catch (Exception e) { + LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e); this.keepRunning.set(false); statusCounter.incrementStatus(DatumStatus.FAIL); } @@ -86,12 +87,15 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt try { Thread.sleep(this.sleepTime); } catch (InterruptedException e) { + LOGGER.warn("Thread interrupted in Writer task for {}",this.writer.getClass().getSimpleName(), e); this.keepRunning.set(false); } } datum = this.inQueue.poll(); } + } catch(Exception e) { + LOGGER.error("Failed to execute Persist Writer {}",this.writer.getClass().getSimpleName(), e); } finally { this.writer.cleanUp(); this.isRunning.set(false); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6934c7a0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java index d1ac905..d4c7a16 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java @@ -67,7 +67,7 @@ public class StreamsProcessorTask extends BaseStreamsTask { try { this.processor.prepare(this.streamConfig); StreamsDatum datum = this.inQueue.poll(); - while(datum != null || this.keepRunning.get()) { + while(this.keepRunning.get()) { if(datum != null) { List<StreamsDatum> output = this.processor.process(datum); if(output != null) {
