Repository: incubator-streams Updated Branches: refs/heads/blueprints 1cf71a4a6 -> 5f133579d Updated Tags: refs/tags/RexsterGraph [created] 99688389a
Implemented new StreamTasksCounter into StreamsTasks Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7e65a423 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7e65a423 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7e65a423 Branch: refs/heads/blueprints Commit: 7e65a423f31c2fc4540b72ade6be904c77638784 Parents: d305371 Author: Ryan Ebanks <[email protected]> Authored: Mon Oct 20 14:16:40 2014 -0500 Committer: Ryan Ebanks <[email protected]> Committed: Mon Oct 20 14:16:40 2014 -0500 ---------------------------------------------------------------------- .../streams/local/tasks/StreamsMergeTask.java | 7 ++++ .../local/tasks/StreamsPersistWriterTask.java | 18 +++++++-- .../local/tasks/StreamsProcessorTask.java | 20 ++++++++-- .../local/tasks/StreamsProviderTask.java | 15 +++++++ .../apache/streams/local/tasks/StreamsTask.java | 4 ++ .../streams/local/tasks/BasicTasksTest.java | 41 ++++++++++++++++++-- 6 files changed, 93 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java index 7a4c806..8280f29 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java @@ -19,6 +19,8 @@ package org.apache.streams.local.tasks; import org.apache.streams.core.StreamsDatum; +import org.apache.streams.local.counters.StreamsTaskCounter; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -78,4 +80,9 @@ public class StreamsMergeTask extends BaseStreamsTask { } } } + + @Override + public void setStreamsTaskCounter(StreamsTaskCounter counter) { + throw new NotImplementedException(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java index cab46b8..003ab9e 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java @@ -20,13 +20,11 @@ package org.apache.streams.local.tasks; import org.apache.streams.core.*; import org.apache.streams.core.util.DatumUtils; +import org.apache.streams.local.counters.StreamsTaskCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,6 +43,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt private BlockingQueue<StreamsDatum> inQueue; private AtomicBoolean isRunning; private AtomicBoolean blocked; + private StreamsTaskCounter counter; private DatumStatusCounter statusCounter = new DatumStatusCounter(); @@ -99,6 +98,9 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt public void run() { try { this.writer.prepare(this.streamConfig); + if(this.counter == null) { + this.counter = new StreamsTaskCounter(this.writer.getClass().getName()+ UUID.randomUUID().toString()); + } while(this.keepRunning.get()) { StreamsDatum datum = null; try { @@ -111,14 +113,18 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt Thread.currentThread().interrupt(); } if(datum != null) { + this.counter.incrementReceivedCount(); try { + long startTime = System.currentTimeMillis(); this.writer.write(datum); + this.counter.addTime(System.currentTimeMillis() - startTime); statusCounter.incrementStatus(DatumStatus.SUCCESS); } catch (Exception e) { LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e); this.keepRunning.set(false); // why do we shutdown on a failed write ? statusCounter.incrementStatus(DatumStatus.FAIL); DatumUtils.addErrorToMetadata(datum, e, this.writer.getClass()); + this.counter.incrementErrorCount(); } } else { //datums should never be null LOGGER.debug("Received null StreamsDatum @ writer : {}", this.writer.getClass().getName()); @@ -151,4 +157,8 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt return queues; } + @Override + public void setStreamsTaskCounter(StreamsTaskCounter counter) { + this.counter = counter; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java index ee69127..8d66847 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java @@ -21,13 +21,11 @@ package org.apache.streams.local.tasks; import com.google.common.collect.Maps; import org.apache.streams.core.*; import org.apache.streams.core.util.DatumUtils; +import org.apache.streams.local.counters.StreamsTaskCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -47,6 +45,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus private BlockingQueue<StreamsDatum> inQueue; private AtomicBoolean isRunning; private AtomicBoolean blocked; + private StreamsTaskCounter counter; private DatumStatusCounter statusCounter = new DatumStatusCounter(); @@ -105,6 +104,9 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus public void run() { try { this.processor.prepare(this.streamConfig); + if(this.counter == null) { + this.counter = new StreamsTaskCounter(this.processor.getClass().getName()+ UUID.randomUUID().toString()); + } while(this.keepRunning.get()) { StreamsDatum datum = null; try { @@ -117,11 +119,15 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus Thread.currentThread().interrupt(); } if(datum != null) { + this.counter.incrementReceivedCount(); try { + long startTime = System.currentTimeMillis(); List<StreamsDatum> output = this.processor.process(datum); + this.counter.addTime(System.currentTimeMillis() - startTime); if(output != null) { for(StreamsDatum outDatum : output) { super.addToOutgoingQueue(datum); + this.counter.incrementEmittedCount(); statusCounter.incrementStatus(DatumStatus.SUCCESS); } } @@ -130,6 +136,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus this.keepRunning.set(false); Thread.currentThread().interrupt(); } catch (Throwable t) { + this.counter.incrementErrorCount(); LOGGER.warn("Caught Throwable in processor, {} : {}", this.processor.getClass().getName(), t.getMessage()); statusCounter.incrementStatus(DatumStatus.FAIL); //Add the error to the metadata, but keep processing @@ -151,4 +158,9 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus queues.add(this.inQueue); return queues; } + + @Override + public void setStreamsTaskCounter(StreamsTaskCounter counter) { + this.counter = counter; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java index c16f64d..2475780 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java @@ -20,6 +20,7 @@ package org.apache.streams.local.tasks; import org.apache.streams.core.*; import org.apache.streams.core.util.DatumUtils; +import org.apache.streams.local.counters.StreamsTaskCounter; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory; import java.math.BigInteger; import java.util.Map; import java.util.Queue; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -64,6 +66,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC private long sleepTime; private int zeros = 0; private DatumStatusCounter statusCounter = new DatumStatusCounter(); + private StreamsTaskCounter counter; /** * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()} @@ -145,13 +148,18 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC StreamsResultSet resultSet = null; //Negative values mean we want to run forever long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime); + if(this.counter == null) { //should never be null + this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString()); + } switch(this.type) { case PERPETUAL: { provider.startStream(); this.started.set(true); while(this.isRunning()) { try { + long startTime = System.currentTimeMillis(); resultSet = provider.readCurrent(); + this.counter.addTime(System.currentTimeMillis() - startTime); if( resultSet.size() == 0 ) zeros++; else { @@ -164,6 +172,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC if(zeros > 0) Thread.sleep(sleepTime); } catch (InterruptedException e) { + this.counter.incrementErrorCount(); LOGGER.warn("Thread interrupted"); this.keepRunning.set(false); } @@ -219,8 +228,10 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC if(datum != null) { try { super.addToOutgoingQueue(datum); + this.counter.incrementEmittedCount(); statusCounter.incrementStatus(DatumStatus.SUCCESS); } catch( Exception e ) { + this.counter.incrementErrorCount(); statusCounter.incrementStatus(DatumStatus.FAIL); DatumUtils.addErrorToMetadata(datum, e, this.provider.getClass()); } @@ -229,4 +240,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC this.flushing.set(false); } + @Override + public void setStreamsTaskCounter(StreamsTaskCounter counter) { + this.counter = counter; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java index 7513631..8423095 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java @@ -19,6 +19,7 @@ package org.apache.streams.local.tasks; import org.apache.streams.core.StreamsDatum; +import org.apache.streams.local.counters.StreamsTaskCounter; import java.util.List; import java.util.Map; @@ -87,4 +88,7 @@ public interface StreamsTask extends Runnable{ */ public List<BlockingQueue<StreamsDatum>> getOutputQueues(); + + public void setStreamsTaskCounter(StreamsTaskCounter counter); + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java index f524db0..f62250d 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java @@ -19,12 +19,18 @@ package org.apache.streams.local.tasks; import org.apache.streams.core.StreamsDatum; +import org.apache.streams.local.counters.DatumStatusCounter; +import org.apache.streams.local.counters.StreamsTaskCounter; import org.apache.streams.local.queues.ThroughputQueue; import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor; import org.apache.streams.local.test.providers.NumericMessageProvider; import org.apache.streams.local.test.writer.DatumCounterWriter; +import org.junit.After; import org.junit.Test; +import javax.management.InstanceNotFoundException; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; import java.util.Queue; import java.util.concurrent.*; @@ -36,6 +42,21 @@ import static org.junit.Assert.*; public class BasicTasksTest { + private static final String MBEAN_ID = "test_bean"; + + /** + * Remove registered mbeans from previous tests + * @throws Exception + */ + @After + public void unregisterMXBean() throws Exception { + try { + ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(StreamsTaskCounter.NAME_TEMPLATE, MBEAN_ID))); + } catch (InstanceNotFoundException ife) { + //No-op + } + } + @Test public void testProviderTask() { @@ -77,7 +98,7 @@ public class BasicTasksTest { assertTrue("Task should have completed running in aloted time.", service.isTerminated()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - }; + } } @Test @@ -85,6 +106,8 @@ public class BasicTasksTest { int numMessages = 100; PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor(""); StreamsProcessorTask task = new StreamsProcessorTask(processor); + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID); + task.setStreamsTaskCounter(counter); BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); task.addOutputQueue(outQueue); @@ -104,8 +127,7 @@ public class BasicTasksTest { fail("Processor task failed to output "+numMessages+" in a timely fashion."); } } - task.stopTask(); - assertEquals(numMessages, processor.getMessageCount()); + task.stopTask();; service.shutdown(); try { if(!service.awaitTermination(5, TimeUnit.SECONDS)){ @@ -116,6 +138,11 @@ public class BasicTasksTest { } catch (InterruptedException e) { fail("Test Interupted."); } + assertEquals(numMessages, processor.getMessageCount()); + assertEquals(numMessages, counter.getNumReceived()); + assertEquals(numMessages, counter.getNumEmitted()); + assertEquals(0, counter.getNumUnhandledErrors()); + assertEquals(0.0, counter.getErrorRate(), 0.0); } @Test @@ -123,6 +150,8 @@ public class BasicTasksTest { int numMessages = 100; DatumCounterWriter writer = new DatumCounterWriter(""); StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer); + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID); + task.setStreamsTaskCounter(counter); BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); @@ -150,7 +179,6 @@ public class BasicTasksTest { } } task.stopTask(); - assertEquals(numMessages, writer.getDatumsCounted()); service.shutdown(); try { if(!service.awaitTermination(5, TimeUnit.SECONDS)){ @@ -161,6 +189,11 @@ public class BasicTasksTest { } catch (InterruptedException e) { fail("Test Interupted."); } + assertEquals(numMessages, writer.getDatumsCounted()); + assertEquals(numMessages, counter.getNumReceived()); + assertEquals(0, counter.getNumEmitted()); + assertEquals(0, counter.getNumUnhandledErrors()); + assertEquals(0.0, counter.getErrorRate(), 0.0); } @Test
