a few missing files
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6adb12a3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6adb12a3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6adb12a3 Branch: refs/heads/master Commit: 6adb12a33d62a1ef99929f92c75f6072993d7bf6 Parents: ec28cc5 Author: Steve Blackmon <[email protected]> Authored: Mon Mar 24 16:19:33 2014 -0500 Committer: Steve Blackmon <[email protected]> Committed: Mon Mar 24 16:19:33 2014 -0500 ---------------------------------------------------------------------- .../streams/core/DatumStatusCountable.java | 10 ++++ .../local/builders/LocalStreamBuilder.java | 6 ++- .../local/tasks/StatusCounterMonitorThread.java | 56 ++++++++++++++++++++ 3 files changed, 70 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6adb12a3/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java new file mode 100644 index 0000000..4fec919 --- /dev/null +++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java @@ -0,0 +1,10 @@ +package org.apache.streams.core; + +/** + * Created by steveblackmon on 3/24/14. + */ +public interface DatumStatusCountable { + + public DatumStatusCounter getDatumStatusCounter(); + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6adb12a3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index d570573..bf1abe6 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -154,8 +154,10 @@ public class LocalStreamBuilder implements StreamBuilder { task.setStreamConfig(this.streamConfig); this.executor.submit(task); compTasks.add(task); - if( comp.isOperationCountable() ) - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable)comp.getOperation(), 10)); + if( comp.isOperationCountable() ) { + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); + } } streamsTasks.put(comp.getId(), compTasks); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6adb12a3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java new file mode 100644 index 0000000..c6febbe --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java @@ -0,0 +1,56 @@ +package org.apache.streams.local.tasks; + +import org.apache.streams.core.DatumStatusCountable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StatusCounterMonitorThread implements Runnable +{ + private static final Logger LOGGER = LoggerFactory.getLogger(StatusCounterMonitorThread.class); + + private DatumStatusCountable task; + + private int seconds; + + private boolean run = true; + + public StatusCounterMonitorThread(DatumStatusCountable task, int delayInSeconds) { + this.task = task; + this.seconds = delayInSeconds; + } + + public void shutdown(){ + this.run = false; + } + + @Override + public void run() + { + while(run){ + + /** + * + * Note: + * Quick class and method to let us see what is going on with the JVM. We need to make sure + * that everything is running with as little memory as possible. If we are generating a heap + * overflow, this will be very apparent by the information shown here. + */ + + LOGGER.debug("{}: {} attempted, {} success, {} partial, {} failed, {} total", + task.getClass(), + task.getDatumStatusCounter().getAttempted(), + task.getDatumStatusCounter().getSuccess(), + task.getDatumStatusCounter().getPartial(), + task.getDatumStatusCounter().getFail(), + task.getDatumStatusCounter().getEmitted()); + + try + { + Thread.sleep(seconds*1000); + } + catch (InterruptedException e) + { } + } + } + +}
