STREAMS-130 | Updated isRunning logic

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/38ed41ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/38ed41ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/38ed41ae

Branch: refs/heads/STREAMS-46
Commit: 38ed41aeb76c0bca2aae61191e524ffa4814ea02
Parents: e78a43b
Author: mfranklin <[email protected]>
Authored: Wed Jul 9 15:59:35 2014 -0400
Committer: mfranklin <[email protected]>
Committed: Wed Jul 9 15:59:35 2014 -0400

----------------------------------------------------------------------
 .../apache/streams/elasticsearch/ElasticsearchPersistReader.java | 3 ++-
 .../org/apache/streams/sysomos/provider/SysomosProvider.java     | 4 +++-
 2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/38ed41ae/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 0f91e8a..4196a46 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -118,9 +118,10 @@ public class ElasticsearchPersistReader implements 
StreamsPersistReader, Seriali
         return readCurrent();
     }
 
+    //If we still have data in the queue, we are still running
     @Override
     public boolean isRunning() {
-        return !readerTask.isDone() && !readerTask.isCancelled();
+        return persistQueue.size() > 0 || (!readerTask.isDone() && 
!readerTask.isCancelled());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/38ed41ae/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
 
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
index fea0b24..07e8c3c 100644
--- 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
+++ 
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -157,9 +157,11 @@ public class SysomosProvider implements StreamsProvider {
         throw new NotImplementedException("readRange not currently 
implemented");
     }
 
+    //If the provider queue still has data, we are still running.  If not, we 
are running if we have not been signaled
+    //by all completed heartbeats so long as the thread pool is alive
     @Override
     public boolean isRunning() {
-        return completedHeartbeats.size() < 
this.getConfig().getHeartbeatIds().size() && !(stream.isTerminated() || 
stream.isShutdown());
+        return providerQueue.size() > 0 || (completedHeartbeats.size() < 
this.getConfig().getHeartbeatIds().size() && !(stream.isTerminated() || 
stream.isShutdown()));
     }
 
     @Override

Reply via email to