adding platform-level status counters debugging data leak
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ec28cc5e Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ec28cc5e Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ec28cc5e Branch: refs/heads/master Commit: ec28cc5e0b4f70f6b1cc1e3aa8911136f4354813 Parents: ab5165a Author: Steve Blackmon <[email protected]> Authored: Mon Mar 24 15:52:43 2014 -0500 Committer: Steve Blackmon <[email protected]> Committed: Mon Mar 24 15:52:43 2014 -0500 ---------------------------------------------------------------------- .../ElasticsearchPersistWriter.java | 13 +++++++--- .../apache/streams/core/DatumStatusCounter.java | 14 +++++++++++ .../local/tasks/StreamsPersistWriterTask.java | 26 ++++++++++++++++---- .../local/tasks/StreamsProviderTask.java | 14 ++++++----- 4 files changed, 53 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ec28cc5e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index 250e15a..9390219 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@ -6,8 +6,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.typesafe.config.Config; import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistWriter; +import org.apache.streams.core.*; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -37,7 +36,7 @@ import java.text.DecimalFormat; import java.text.NumberFormat; import java.util.*; -public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable +public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable { public final static String STREAMS_ID = "ElasticsearchPersistWriter"; @@ -530,4 +529,12 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab start(); } + @Override + public DatumStatusCounter getDatumStatusCounter() { + DatumStatusCounter counters = new DatumStatusCounter(); + counters.incrementAttempt(this.batchItemsSent); + counters.incrementStatus(DatumStatus.SUCCESS, this.totalOk); + counters.incrementStatus(DatumStatus.FAIL, this.totalFailed); + return counters; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ec28cc5e/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java index 8730d73..96f73c9 100644 --- a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java +++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java @@ -29,6 +29,10 @@ public class DatumStatusCounter this.attempted += 1; } + public void incrementAttempt(int counter) { + this.attempted += counter; + } + public synchronized void incrementStatus(DatumStatus workStatus) { // add this to the record counter switch(workStatus) { @@ -39,6 +43,16 @@ public class DatumStatusCounter this.emitted += 1; } + public synchronized void incrementStatus(DatumStatus workStatus, int counter) { + // add this to the record counter + switch(workStatus) { + case SUCCESS: this.success += counter; break; + case PARTIAL: this.partial += counter; break; + case FAIL: this.fail += counter; break; + } + this.emitted += counter; + } + @Override public String toString() { return "DatumStatusCounter{" + http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ec28cc5e/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java index 882bcb7..1eac1d9 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java @@ -1,7 +1,8 @@ package org.apache.streams.local.tasks; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistWriter; +import org.apache.streams.core.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.LinkedList; import java.util.List; @@ -12,9 +13,9 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * */ -public class StreamsPersistWriterTask extends BaseStreamsTask { - +public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumStatusCountable { + private final static Logger LOGGER = LoggerFactory.getLogger(StreamsPersistWriterTask.class); private StreamsPersistWriter writer; private long sleepTime; @@ -23,6 +24,14 @@ public class StreamsPersistWriterTask extends BaseStreamsTask { private Queue<StreamsDatum> inQueue; private AtomicBoolean isRunning; + private DatumStatusCounter statusCounter = new DatumStatusCounter(); + + @Override + public DatumStatusCounter getDatumStatusCounter() { + return this.statusCounter; + } + + /** * Default constructor. Uses default sleep of 500ms when inbound queue is empty. * @param writer writer to execute in task @@ -65,7 +74,13 @@ public class StreamsPersistWriterTask extends BaseStreamsTask { StreamsDatum datum = this.inQueue.poll(); while(datum != null || this.keepRunning.get()) { if(datum != null) { - this.writer.write(datum); + try { + this.writer.write(datum); + statusCounter.incrementStatus(DatumStatus.SUCCESS); + } catch (Exception e) { + this.keepRunning.set(false); + statusCounter.incrementStatus(DatumStatus.FAIL); + } } else { try { @@ -100,4 +115,5 @@ public class StreamsPersistWriterTask extends BaseStreamsTask { queues.add(this.inQueue); return queues; } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ec28cc5e/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java index 7b6792f..5cf515c 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java @@ -119,10 +119,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC zeros++; else { zeros = 0; - if( resultSet.getCounter() != null ) { - LOGGER.debug(resultSet.getCounter().toString()); - this.statusCounter.add(resultSet.getCounter()); - } } flushResults(resultSet); if( zeros > (DEFAULT_TIMEOUT_MS / DEFAULT_SLEEP_TIME_MS)) @@ -162,8 +158,14 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC if(!this.keepRunning.get()) { break; } - if(datum != null) - super.addToOutgoingQueue(datum); + if(datum != null) { + try { + super.addToOutgoingQueue(datum); + statusCounter.incrementStatus(DatumStatus.SUCCESS); + } catch( Exception e ) { + statusCounter.incrementStatus(DatumStatus.FAIL); + } + } else { try { Thread.sleep(DEFAULT_SLEEP_TIME_MS);
