STREAMS-130 | Updated isRunning logic
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/38ed41ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/38ed41ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/38ed41ae Branch: refs/heads/STREAMS-46 Commit: 38ed41aeb76c0bca2aae61191e524ffa4814ea02 Parents: e78a43b Author: mfranklin <[email protected]> Authored: Wed Jul 9 15:59:35 2014 -0400 Committer: mfranklin <[email protected]> Committed: Wed Jul 9 15:59:35 2014 -0400 ---------------------------------------------------------------------- .../apache/streams/elasticsearch/ElasticsearchPersistReader.java | 3 ++- .../org/apache/streams/sysomos/provider/SysomosProvider.java | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/38ed41ae/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java index 0f91e8a..4196a46 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java @@ -118,9 +118,10 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali return readCurrent(); } + //If we still have data in the queue, we are still running @Override public boolean isRunning() { - return !readerTask.isDone() && !readerTask.isCancelled(); + return persistQueue.size() > 0 || (!readerTask.isDone() && !readerTask.isCancelled()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/38ed41ae/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java index fea0b24..07e8c3c 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java @@ -157,9 +157,11 @@ public class SysomosProvider implements StreamsProvider { throw new NotImplementedException("readRange not currently implemented"); } + //If the provider queue still has data, we are still running. If not, we are running if we have not been signaled + //by all completed heartbeats so long as the thread pool is alive @Override public boolean isRunning() { - return completedHeartbeats.size() < this.getConfig().getHeartbeatIds().size() && !(stream.isTerminated() || stream.isShutdown()); + return providerQueue.size() > 0 || (completedHeartbeats.size() < this.getConfig().getHeartbeatIds().size() && !(stream.isTerminated() || stream.isShutdown())); } @Override
