[ 
https://issues.apache.org/jira/browse/STREAMS-605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16549430#comment-16549430
 ] 

ASF GitHub Bot commented on STREAMS-605:
----------------------------------------

steveblackmon closed pull request #455: resolves STREAMS-605
URL: https://github.com/apache/streams/pull/455
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/graph/Page.json
 
b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/graph/Page.json
new file mode 100644
index 0000000000..55d6e12633
--- /dev/null
+++ 
b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/graph/Page.json
@@ -0,0 +1,281 @@
+{
+    "$schema": "http://json-schema.org/draft-03/schema";,
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0";
+    ],
+    "id": "#",
+    "type": "object",
+    "javaType": "org.apache.streams.facebook.graph.Page",
+    "javaInterfaces": [
+        "java.io.Serializable"
+    ],
+    "properties": {
+        "about": {
+            "type": "string",
+            "required": false
+        },
+        "username": {
+            "type": "string",
+            "required": false
+        },
+        "accessToken": {
+            "type": "string",
+            "required": false
+        },
+        "birthday": {
+            "type": "string",
+            "required": false
+        },
+        "category": {
+            "type": "string",
+            "required": false
+        },
+        "can_checkin": {
+            "type": "boolean",
+            "required": false
+        },
+        "can_post": {
+            "type": "boolean",
+            "required": false
+        },
+        "checkins": {
+            "type": "integer",
+            "required": false
+        },
+        "communityPage": {
+            "type": "boolean",
+            "required": false
+        },
+        "cover": {
+            "type": "object",
+            "required": false,
+            "properties": {
+                "id": {
+                    "type": "null",
+                    "required": false
+                },
+                "offsetY": {
+                    "type": "number",
+                    "required": false
+                },
+                "source": {
+                    "type": "string",
+                    "required": false
+                }
+            }
+        },
+        "display_subtext": {
+            "type": "string",
+            "required": false
+        },
+        "displayed_message_response_time": {
+            "type": "string",
+            "required": false
+        },
+        "engagement": {
+            "type": "object",
+            "required": false,
+            "properties": {
+                "count": {
+                    "type": "integer",
+                    "required": false
+                },
+                "social_sentence": {
+                    "type": "string",
+                    "required": false
+                }
+            }
+        },
+        "fan_count": {
+            "type": "integer",
+            "required": false
+        },
+        "founded": {
+            "type": "string",
+            "required": false
+        },
+        "general_info": {
+            "type": "string",
+            "required": false
+        },
+        "global_brand_root_id": {
+            "type": "string",
+            "required": false
+        },
+        "id": {
+            "type": "string",
+            "required": false
+        },
+        "is_community_page": {
+            "type": "boolean",
+            "required": false
+        },
+        "is_always_open": {
+            "type": "boolean",
+            "required": false
+        },
+        "is_permanently_closed": {
+            "type": "boolean",
+            "required": false
+        },
+        "is_published": {
+            "type": "boolean",
+            "required": false
+        },
+        "is_unclaimed": {
+            "type": "boolean",
+            "required": false
+        },
+        "is_webhooks_subscribed": {
+            "type": "boolean",
+            "required": false
+        },
+        "leadgen_tos_accepted": {
+            "type": "boolean",
+            "required": false
+        },
+        "link": {
+            "type": "string",
+            "required": false
+        },
+        "location": {
+            "type": "object",
+            "required": false,
+            "properties": {
+                "city": {
+                    "type": "string",
+                    "required": false
+                },
+                "country": {
+                    "type": "string",
+                    "required": false
+                },
+                "latitude": {
+                    "type": "null",
+                    "required": false
+                },
+                "longitude": {
+                    "type": "null",
+                    "required": false
+                },
+                "state": {
+                    "type": "string",
+                    "required": false
+                },
+                "street": {
+                    "type": "string",
+                    "required": false
+                },
+                "text": {
+                    "type": "null",
+                    "required": false
+                },
+                "zip": {
+                    "type": "string",
+                    "required": false
+                }
+            }
+        },
+        "mission": {
+            "type": "string",
+            "required": false
+        },
+        "parking": {
+            "type": "object",
+            "required": false,
+            "properties": {
+                "lot": {
+                    "type": "integer",
+                    "required": false
+                },
+                "street": {
+                    "type": "integer",
+                    "required": false
+                },
+                "valet": {
+                    "type": "integer",
+                    "required": false
+                }
+            }
+        },
+        "name": {
+            "type": "string",
+            "id": "http://jsonschema.net/name";,
+            "required": false
+        },
+        "name_with_location_descriptor": {
+            "type": "string",
+            "required": false
+        },
+        "overall_star_rating": {
+            "type": "integer",
+            "required": false
+        },
+        "phone": {
+            "type": "string",
+            "required": false
+        },
+        "picture": {
+            "type": "null",
+            "required": false
+        },
+        "published": {
+            "type": "boolean",
+            "required": false
+        },
+        "rating_count": {
+            "type": "integer",
+            "required": false
+        },
+        "talking_about_count": {
+            "type": "integer",
+            "required": false
+        },
+        "voip_info": {
+            "type": "object",
+            "required": false,
+            "properties": {
+                "has_mobile_app": {
+                    "type": "boolean",
+                    "required": false
+                },
+                "has_permission": {
+                    "type": "boolean",
+                    "required": false
+                },
+                "is_callable": {
+                    "type": "boolean",
+                    "required": false
+                },
+                "is_callable_webrtc": {
+                    "type": "boolean",
+                    "required": false
+                },
+                "is_pushable": {
+                    "type": "boolean",
+                    "required": false
+                },
+                "reason_code": {
+                    "type": "integer",
+                    "required": false
+                },
+                "reason_description": {
+                    "type": "string",
+                    "required": false
+                }
+            }
+        },
+        "verification_status": {
+            "type": "string",
+            "required": false
+        },
+        "website": {
+            "type": "string",
+            "required": false
+        },
+        "were_here_count": {
+            "type": "integer",
+            "required": false
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
index 10a2661dc6..2c76a5f5d6 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
@@ -25,11 +25,15 @@
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.StreamsJacksonMapperConfiguration;
 import org.apache.streams.twitter.config.SevenDaySearchProviderConfiguration;
 import org.apache.streams.twitter.api.SevenDaySearchRequest;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -54,10 +58,13 @@
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,7 +76,7 @@
 /**
  * Retrieve recent posts from a list of user ids or names.
  */
-public class SevenDaySearchProvider implements StreamsProvider, Serializable {
+public class SevenDaySearchProvider implements Callable<Iterator<Tweet>>, 
StreamsProvider, Serializable {
 
   private static final String STREAMS_ID = "SevenDaySearchProvider";
 
@@ -91,11 +98,14 @@ public SevenDaySearchProviderConfiguration getConfig() {
 
   protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  protected ExecutorService executor;
 
-  protected final AtomicBoolean running = new AtomicBoolean();
+  StreamsConfiguration streamsConfiguration;
+
+  private List<Callable<Object>> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
 
-  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+  protected final AtomicBoolean running = new AtomicBoolean();
 
   /**
    * To use from command line:
@@ -136,26 +146,21 @@ public static void main(String[] args) throws Exception {
     SevenDaySearchProviderConfiguration config = new 
ComponentConfigurator<>(SevenDaySearchProviderConfiguration.class).detectConfiguration();
     SevenDaySearchProvider provider = new SevenDaySearchProvider(config);
 
-    ObjectMapper mapper = new 
StreamsJacksonMapper(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()));
-
     PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
-    provider.prepare(config);
-    provider.startStream();
-    do {
-      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-      for (StreamsDatum datum : provider.readCurrent()) {
-        String json;
-        try {
-          json = mapper.writeValueAsString(datum.getDocument());
-          outStream.println(json);
-        } catch (JsonProcessingException ex) {
-          System.err.println(ex.getMessage());
-        }
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(new 
StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())));
+
+    Iterator<Tweet> results = provider.call();
+
+    results.forEachRemaining(d -> {
+      try {
+        outStream.println(mapper.writeValueAsString(d));
+      } catch( Exception e ) {
+        LOGGER.warn("Exception", e);
       }
-    }
-    while ( provider.isRunning() );
-    provider.cleanUp();
+    });
+
     outStream.flush();
+
   }
 
   public SevenDaySearchProvider() {
@@ -184,7 +189,7 @@ public void prepare(Object configurationObject) {
 
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -200,7 +205,7 @@ public void prepare(Object configurationObject) {
     request = new SevenDaySearchRequest();
     request.setQ(config.getQ());
 
-    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration();
+    streamsConfiguration = StreamsConfigurator.detectConfiguration();
 
     try {
       client = getTwitterClient();
@@ -211,7 +216,7 @@ public void prepare(Object configurationObject) {
     Objects.requireNonNull(client);
 
     executor = MoreExecutors.listeningDecorator(
-        TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -224,24 +229,32 @@ public void prepare(Object configurationObject) {
   @Override
   public void startStream() {
 
-    LOGGER.debug("{} startStream", STREAMS_ID);
+    Objects.requireNonNull(executor);
+
+    LOGGER.info("startStream");
 
     running.set(true);
 
-    executor.shutdown();
+    LOGGER.info("running: {}", running.get());
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+
+    LOGGER.info("running: {}", running.get());
 
   }
 
   protected void submitSearchThread() {
 
-      SevenDaySearchProviderTask providerTask = new SevenDaySearchProviderTask(
+      Callable providerTask = new SevenDaySearchProviderTask(
           this,
           client,
         request
       );
-      ListenableFuture future = executor.submit(providerTask);
+      LOGGER.info("Thread Created: {}", request);
+      tasks.add(providerTask);
+      Future future = executor.submit(providerTask);
       futures.add(future);
-      LOGGER.info("Thread Submitted: {}", providerTask.request);
+      LOGGER.info("Thread Submitted: {}", request);
 
   }
 
@@ -255,8 +268,7 @@ public StreamsResultSet readCurrent() {
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
-      result.setCounter(new DatumStatusCounter());
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -273,10 +285,6 @@ public StreamsResultSet readCurrent() {
 
   }
 
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<StreamsDatum>();
-  }
-
   public StreamsResultSet readNew(BigInteger sequence) {
     LOGGER.debug("{} readNew", STREAMS_ID);
     throw new NotImplementedException();
@@ -299,35 +307,30 @@ public Twitter getTwitterClient() throws 
InstantiationException {
 
   @Override
   public void cleanUp() {
-    shutdownAndAwaitTermination(executor);
-  }
-
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
   }
 
   @Override
   public boolean isRunning() {
-    if (providerQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    if ( tasks.size() > 0 && tasks.size() == futures.size() && 
executor.isTerminated() ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
+    LOGGER.debug("isRunning: {}", running.get());
     return running.get();
   }
+
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    prepare(config);
+    startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> 
((Tweet)x.getDocument())).distinct().iterator();
+
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
index 8bb0b0f4d8..ae52174818 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
@@ -32,14 +32,17 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
  *  Retrieve recent posts for a single user id.
  */
-public class SevenDaySearchProviderTask implements Runnable {
+public class SevenDaySearchProviderTask implements Callable<Iterator<Tweet>>, 
Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SevenDaySearchProviderTask.class);
 
@@ -48,6 +51,7 @@
   protected SevenDaySearchProvider provider;
   protected Twitter client;
   protected SevenDaySearchRequest request;
+  protected List<Tweet> responseList;
 
   /**
    * SevenDaySearchProviderTask constructor.
@@ -59,6 +63,7 @@ public SevenDaySearchProviderTask(SevenDaySearchProvider 
provider, Twitter twitt
     this.provider = provider;
     this.client = twitter;
     this.request = request;
+    this.responseList = new ArrayList<>();
   }
 
   int item_count = 0;
@@ -76,6 +81,8 @@ public void run() {
 
       List<Tweet> statuses = response.getStatuses();
 
+      responseList.addAll(statuses);
+
       last_count = statuses.size();
       if( statuses.size() > 0 ) {
 
@@ -109,6 +116,10 @@ public boolean shouldContinuePulling(int count, int 
page_count, int item_count)
             && page_count <= provider.getConfig().getMaxPages());
   }
 
-
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
index 080a30d092..287ab4f9b5 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
@@ -25,11 +25,15 @@
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.StreamsJacksonMapperConfiguration;
 import org.apache.streams.twitter.config.ThirtyDaySearchProviderConfiguration;
 import org.apache.streams.twitter.api.ThirtyDaySearchRequest;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -54,10 +58,13 @@
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,7 +76,7 @@
 /**
  * Retrieve recent posts from a list of user ids or names.
  */
-public class ThirtyDaySearchProvider implements StreamsProvider, Serializable {
+public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, 
StreamsProvider, Serializable {
 
   private static final String STREAMS_ID = "ThirtyDaySearchProvider";
 
@@ -91,11 +98,14 @@ public ThirtyDaySearchProviderConfiguration getConfig() {
 
   protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  protected ExecutorService executor;
 
-  protected final AtomicBoolean running = new AtomicBoolean();
+  private List<Callable<Object>> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
+
+  StreamsConfiguration streamsConfiguration;
 
-  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+  protected final AtomicBoolean running = new AtomicBoolean();
 
   /**
    * To use from command line:
@@ -137,26 +147,21 @@ public static void main(String[] args) throws Exception {
     ThirtyDaySearchProviderConfiguration config = new 
ComponentConfigurator<>(ThirtyDaySearchProviderConfiguration.class).detectConfiguration();
     ThirtyDaySearchProvider provider = new ThirtyDaySearchProvider(config);
 
-    ObjectMapper mapper = new 
StreamsJacksonMapper(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()));
-
     PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
-    provider.prepare(config);
-    provider.startStream();
-    do {
-      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-      for (StreamsDatum datum : provider.readCurrent()) {
-        String json;
-        try {
-          json = mapper.writeValueAsString(datum.getDocument());
-          outStream.println(json);
-        } catch (JsonProcessingException ex) {
-          System.err.println(ex.getMessage());
-        }
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(new 
StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())));
+
+    Iterator<Tweet> results = provider.call();
+
+    results.forEachRemaining(d -> {
+      try {
+        outStream.println(mapper.writeValueAsString(d));
+      } catch( Exception e ) {
+        LOGGER.warn("Exception", e);
       }
-    }
-    while ( provider.isRunning() );
-    provider.cleanUp();
+    });
+
     outStream.flush();
+
   }
 
   public ThirtyDaySearchProvider(ThirtyDaySearchProviderConfiguration config) {
@@ -181,7 +186,7 @@ public void prepare(Object configurationObject) {
 
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -197,7 +202,7 @@ public void prepare(Object configurationObject) {
     request = new ThirtyDaySearchRequest();
     request.setQuery(config.getQuery());
 
-    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration();
+    streamsConfiguration = StreamsConfigurator.detectConfiguration();
 
     try {
       client = getTwitterClient();
@@ -208,7 +213,7 @@ public void prepare(Object configurationObject) {
     Objects.requireNonNull(client);
 
     executor = MoreExecutors.listeningDecorator(
-        TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -221,25 +226,32 @@ public void prepare(Object configurationObject) {
   @Override
   public void startStream() {
 
-    LOGGER.debug("{} startStream", STREAMS_ID);
+    Objects.requireNonNull(executor);
+
+    LOGGER.info("startStream");
 
     running.set(true);
 
-    executor.shutdown();
+    LOGGER.info("running: {}", running.get());
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+
+    LOGGER.info("running: {}", running.get());
 
   }
 
   protected void submitSearchThread() {
 
-    ThirtyDaySearchProviderTask providerTask = new ThirtyDaySearchProviderTask(
+    Callable providerTask = new ThirtyDaySearchProviderTask(
           this,
           client,
         request
       );
-      ListenableFuture future = executor.submit(providerTask);
-      futures.add(future);
-      LOGGER.info("Thread Submitted: {}", providerTask.request);
-
+    LOGGER.info("Thread Created: {}", request);
+    tasks.add(providerTask);
+    Future future = executor.submit(providerTask);
+    futures.add(future);
+    LOGGER.info("Thread Submitted: {}", request);
   }
 
   @Override
@@ -252,8 +264,7 @@ public StreamsResultSet readCurrent() {
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
-      result.setCounter(new DatumStatusCounter());
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -270,10 +281,6 @@ public StreamsResultSet readCurrent() {
 
   }
 
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<StreamsDatum>();
-  }
-
   public StreamsResultSet readNew(BigInteger sequence) {
     LOGGER.debug("{} readNew", STREAMS_ID);
     throw new NotImplementedException();
@@ -296,35 +303,29 @@ public Twitter getTwitterClient() throws 
InstantiationException {
 
   @Override
   public void cleanUp() {
-    shutdownAndAwaitTermination(executor);
-  }
-
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
   }
 
   @Override
   public boolean isRunning() {
-    if (providerQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    if ( tasks.size() > 0 && tasks.size() == futures.size() && 
executor.isShutdown() && executor.isTerminated() ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
+    LOGGER.debug("isRunning: {}", running.get());
     return running.get();
   }
+
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    prepare(config);
+    startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> 
((Tweet)x.getDocument())).iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
index 07b40e55e2..b4f3618645 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
@@ -31,14 +31,17 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
  *  Retrieve recent posts for a single user id.
  */
-public class ThirtyDaySearchProviderTask implements Runnable {
+public class ThirtyDaySearchProviderTask implements Callable<Iterator<Tweet>>, 
Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ThirtyDaySearchProviderTask.class);
 
@@ -47,6 +50,7 @@
   protected ThirtyDaySearchProvider provider;
   protected Twitter client;
   protected ThirtyDaySearchRequest request;
+  protected List<Tweet> responseList;
 
   /**
    * ThirtyDaySearchProviderTask constructor.
@@ -58,6 +62,7 @@ public ThirtyDaySearchProviderTask(ThirtyDaySearchProvider 
provider, Twitter twi
     this.provider = provider;
     this.client = twitter;
     this.request = request;
+    this.responseList = new ArrayList<>();
   }
 
   int item_count = 0;
@@ -75,6 +80,8 @@ public void run() {
 
       List<Tweet> statuses = response.getResults();
 
+      responseList.addAll(statuses);
+
       last_count = statuses.size();
       if( statuses.size() > 0 ) {
 
@@ -107,6 +114,9 @@ public boolean shouldContinuePulling(int count, int 
page_count, int item_count)
         && page_count <= provider.getConfig().getMaxPages());
   }
 
-
-
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java
index bbb2881bbe..e244f50047 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java
@@ -24,8 +24,11 @@
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.StreamsJacksonMapperConfiguration;
+import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.config.TwitterEngagersProviderConfiguration;
 import org.apache.streams.twitter.config.TwitterTimelineProviderConfiguration;
 import org.apache.streams.twitter.api.RetweetsRequest;
@@ -36,6 +39,7 @@
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -58,8 +62,11 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -73,7 +80,7 @@
 /**
  * Retrieve posts from a list of user ids or names, then provide all of the 
users who retweeted those posts.
  */
-public class TwitterEngagersProvider extends TwitterTimelineProvider 
implements StreamsProvider, Serializable {
+public class TwitterEngagersProvider implements Callable<Iterator<User>>, 
StreamsProvider, Serializable {
 
   private static final String STREAMS_ID = "TwitterEngagersProvider";
 
@@ -91,14 +98,19 @@ public TwitterEngagersProviderConfiguration getConfig() {
 
   protected final AtomicBoolean running = new AtomicBoolean();
 
-  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+  protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  private List<Callable<Object>> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
+
+  public static ExecutorService executor;
 
   StreamsConfiguration streamsConfiguration;
 
   RetweetsRequest baseRetweetsRequest;
 
+  TwitterTimelineProvider timelineProvider;
+
   /**
    * To use from command line:
    *
@@ -134,29 +146,22 @@ public static void main(String[] args) throws Exception {
     Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, 
ConfigParseOptions.defaults().setAllowMissing(false)).withFallback(StreamsConfigurator.getConfig());
     StreamsConfigurator.addConfig(testResourceConfig);
 
-    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration();
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(new 
StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())));
+
     TwitterEngagersProviderConfiguration config = new 
ComponentConfigurator<>(TwitterEngagersProviderConfiguration.class).detectConfiguration();
     TwitterEngagersProvider provider = new TwitterEngagersProvider(config);
 
-    ObjectMapper mapper = StreamsJacksonMapper.getInstance(new 
StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())));
+    Iterator<User> results = provider.call();
 
-    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
-    provider.prepare(config);
-    provider.startStream();
-    do {
-      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-      for (StreamsDatum datum : provider.readCurrent()) {
-        String json;
-        try {
-          json = mapper.writeValueAsString(datum.getDocument());
-          outStream.println(json);
-        } catch (JsonProcessingException ex) {
-          System.err.println(ex.getMessage());
-        }
+    results.forEachRemaining(d -> {
+      try {
+        outStream.println(mapper.writeValueAsString(d));
+      } catch( Exception e ) {
+        LOGGER.warn("Exception", e);
       }
-    }
-    while ( provider.isRunning() );
-    provider.cleanUp();
+    });
+
     outStream.flush();
   }
 
@@ -176,27 +181,37 @@ public String getId() {
   @Override
   public void prepare(Object configurationObject) {
 
+    timelineProvider = new TwitterTimelineProvider(config);
+
     if( configurationObject instanceof TwitterEngagersProviderConfiguration ) {
       this.config = (TwitterEngagersProviderConfiguration)configurationObject;
-      super.prepare(MAPPER.convertValue(this.config, 
TwitterTimelineProviderConfiguration.class));
+      timelineProvider.prepare(MAPPER.convertValue(this.config, 
TwitterTimelineProviderConfiguration.class));
     } else if( configurationObject instanceof 
TwitterTimelineProviderConfiguration ) {
-      super.prepare(configurationObject);
+      timelineProvider.prepare(configurationObject);
       this.config = MAPPER.convertValue(this.config, 
TwitterEngagersProviderConfiguration.class);
     } else {
-      super.prepare(null);
+      timelineProvider.prepare(null);
     }
 
     streamsConfiguration = 
StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig());
 
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
 
+    try {
+      client = getTwitterClient();
+    } catch (InstantiationException e) {
+      LOGGER.error("InstantiationException", e);
+    }
+
+    Objects.requireNonNull(client);
+
     executor = MoreExecutors.listeningDecorator(
-      TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
         config.getThreadsPerProvider().intValue(),
         streamsConfiguration.getQueueSize().intValue()
       )
@@ -206,41 +221,50 @@ public void prepare(Object configurationObject) {
 
   }
 
+  /**
+   * get Twitter Client from TwitterUserInformationConfiguration.
+   * @return result
+   */
+  public Twitter getTwitterClient() throws InstantiationException {
+
+    return Twitter.getInstance(config);
+
+  }
+
   @Override
   public void startStream() {
 
     LOGGER.debug("{} startStream", STREAMS_ID);
 
-    super.startStream();
+    Iterator<Tweet> timelineIterator = timelineProvider.call();
 
-    do {
-      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-      Iterator<StreamsDatum> iterator = super.readCurrent().iterator();
-      while (iterator.hasNext()) {
-        StreamsDatum datum = iterator.next();
-        Tweet tweet = (Tweet) datum.getDocument();
-        submitRetweeterIdsTaskThread(tweet.getId());
-      }
-    }
-    while ( super.isRunning() );
-    super.cleanUp();
-    executor.shutdown();
+    List<Tweet> timelineList = Lists.newArrayList(timelineIterator);
+
+    LOGGER.info("running: {}", running.get());
+
+    timelineList.forEach(tweet -> submitRetweeterIdsTaskThread(tweet.getId()));
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+
+    LOGGER.info("running: {}", running.get());
 
   }
 
   protected void submitRetweeterIdsTaskThread( Long postId ) {
 
+    Callable<Object> callable = createTask(postId);
+    LOGGER.info("Thread Created: {}", postId);
+    tasks.add(callable);
+    futures.add(executor.submit(callable));
+    LOGGER.info("Thread Submitted: {}", postId);
+
+  }
+
+  protected Callable createTask( Long postId ) {
     RetweetsRequest request = new 
ComponentConfigurator<>(RetweetsRequest.class).detectConfiguration();
     request.setId(postId);
-      TwitterRetweetsTask providerTask = new TwitterRetweetsTask(
-      this,
-      client,
-      request
-    );
-    ListenableFuture future = executor.submit(providerTask);
-    super.futures.add(future);
-    LOGGER.info("Thread Submitted: {}", providerTask.request);
-
+    Callable callable = new TwitterRetweetsTask(this, client, request);
+    return callable;
   }
 
   @Override
@@ -252,7 +276,7 @@ public StreamsResultSet readCurrent() {
 
     try {
       lock.writeLock().lock();
-      Queue<StreamsDatum> resultQueue = constructQueue();
+      Queue<StreamsDatum> resultQueue = QueueUtils.constructQueue();
       providerQueue.iterator().forEachRemaining(
         datum -> {
           Tweet tweet = ((Tweet) datum.getDocument());
@@ -266,12 +290,12 @@ public StreamsResultSet readCurrent() {
         }
       );
       result = new StreamsResultSet(resultQueue);
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
 
-    if ( result.size() == 0 && providerQueue.isEmpty() && 
executor.isTerminated() ) {
+    if ( result.size() == 0 && providerQueue.isEmpty() && 
executor.isShutdown() && executor.isTerminated() ) {
       LOGGER.info("Finished.  Cleaning up...");
 
       running.set(false);
@@ -283,10 +307,6 @@ public StreamsResultSet readCurrent() {
 
   }
 
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<StreamsDatum>();
-  }
-
   public StreamsResultSet readNew(BigInteger sequence) {
     LOGGER.debug("{} readNew", STREAMS_ID);
     throw new NotImplementedException();
@@ -299,36 +319,31 @@ public StreamsResultSet readRange(DateTime start, 
DateTime end) {
 
   @Override
   public void cleanUp() {
-    super.cleanUp();
-    shutdownAndAwaitTermination(executor);
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
   }
 
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  @Override
   public boolean isRunning() {
-    if (providerQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
+    LOGGER.debug("timelineProvider.isRunning: {}", 
timelineProvider.isRunning());
+    LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty());
+    LOGGER.debug("providerQueue.size: {}", providerQueue.size());
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    if ( timelineProvider.isRunning() == false && tasks.size() > 0 && 
tasks.size() == futures.size() && executor.isTerminated() ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
+    LOGGER.debug("isRunning: {}", running.get());
     return running.get();
   }
+
+  @Override
+  public Iterator<User> call() {
+    prepare(config);
+    startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> 
((Tweet)x.getDocument()).getUser()).distinct().iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
index 7acbeef629..0b2305add9 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
@@ -22,6 +22,7 @@
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.api.FollowersIdsRequest;
 import org.apache.streams.twitter.api.FollowersIdsResponse;
+import org.apache.streams.twitter.api.FriendsIdsResponse;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.pojo.Follow;
 import org.apache.streams.twitter.pojo.User;
@@ -32,10 +33,15 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+
 /**
  *  Retrieve friend or follower connections for a single user id.
  */
-public class TwitterFollowersIdsProviderTask implements Runnable {
+public class TwitterFollowersIdsProviderTask implements 
Callable<Iterator<FollowersIdsResponse>>, Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterFollowersIdsProviderTask.class);
 
@@ -44,6 +50,7 @@
   protected Twitter client;
   protected TwitterFollowingProvider provider;
   protected FollowersIdsRequest request;
+  protected List<FollowersIdsResponse> responseList;
 
   /**
    * TwitterFollowingProviderTask constructor.
@@ -62,6 +69,8 @@ public void run() {
 
     Preconditions.checkArgument(request.getId() != null || 
request.getScreenName() != null);
 
+    responseList = new ArrayList<>();
+
     LOGGER.info("Thread Starting: {}", request.toString());
 
     getFollowersIds(request);
@@ -81,6 +90,8 @@ private void getFollowersIds(FollowersIdsRequest request) {
 
       FollowersIdsResponse response = client.ids(request);
 
+      responseList.add(response);
+
       last_count = response.getIds().size();
 
       if (response.getIds().size() > 0) {
@@ -122,4 +133,9 @@ public boolean shouldContinuePulling(long cursor, int 
count, int page_count, int
             && page_count <= provider.getConfig().getMaxPages());
   }
 
+  @Override
+  public Iterator<FollowersIdsResponse> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
index 5195a132e3..94ec4551f0 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
@@ -28,13 +28,19 @@
 import org.apache.streams.util.ComponentUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+
 /**
  *  Retrieve friend or follower connections for a single user id.
  */
-public class TwitterFollowersListProviderTask implements Runnable {
+public class TwitterFollowersListProviderTask implements 
Callable<Iterator<FollowersListResponse>>, Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterFollowersListProviderTask.class);
 
@@ -43,6 +49,7 @@
   protected Twitter client;
   protected TwitterFollowingProvider provider;
   protected FollowersListRequest request;
+  protected List<FollowersListResponse> responseList;
 
   /**
    * TwitterFollowersListProviderTask constructor.
@@ -59,8 +66,12 @@ public 
TwitterFollowersListProviderTask(TwitterFollowingProvider provider, Twitt
   @Override
   public void run() {
 
+    Preconditions.checkArgument(request.getId() != null || 
request.getScreenName() != null);
+
     LOGGER.info("Thread Starting: {}", request.toString());
 
+    responseList = new ArrayList<>();
+
     getFollowersList(request);
 
     LOGGER.info("Thread Finished: {}", request.toString());
@@ -78,6 +89,8 @@ private void getFollowersList(FollowersListRequest request) {
 
       FollowersListResponse response = client.list(request);
 
+      responseList.add(response);
+
       last_count = response.getUsers().size();
 
       if (response.getUsers().size() > 0) {
@@ -118,4 +131,9 @@ public boolean shouldContinuePulling(long cursor, int 
count, int page_count, int
             && page_count <= provider.getConfig().getMaxPages());
   }
 
+  @Override
+  public Iterator<FollowersListResponse> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 7e6949cec2..794e82a803 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -25,6 +25,8 @@
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.StreamsJacksonMapperConfiguration;
 import org.apache.streams.twitter.config.TwitterFollowingConfiguration;
@@ -34,6 +36,7 @@
 import org.apache.streams.twitter.api.FriendsListRequest;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Follow;
 import org.apache.streams.twitter.pojo.User;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -43,6 +46,7 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
@@ -59,26 +63,34 @@
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static sun.misc.PostVMInitHook.run;
 
 /**
  * Retrieve all follow adjacencies from a list of user ids or names.
  */
-public class TwitterFollowingProvider implements StreamsProvider, Serializable 
{
+public class TwitterFollowingProvider implements Callable<Iterator<Follow>>, 
StreamsProvider, Serializable {
 
   public static final String STREAMS_ID = "TwitterFollowingProvider";
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterFollowingProvider.class);
 
   private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
+  private StreamsConfiguration streamsConfiguration;
   private TwitterFollowingConfiguration config;
 
   protected List<String> names = new ArrayList<>();
@@ -86,9 +98,10 @@
 
   protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  public static ExecutorService executor;
 
-  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+  private List<Callable<Object>> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
 
   protected final AtomicBoolean running = new AtomicBoolean();
 
@@ -129,31 +142,22 @@ public static void main(String[] args) throws Exception {
     Config configFile = ConfigFactory.parseFileAnySyntax(file, 
ConfigParseOptions.defaults());
     StreamsConfigurator.addConfig(configFile);
 
-    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration();
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(new 
StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())));
+
     TwitterFollowingConfiguration config = new 
ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration();
     TwitterFollowingProvider provider = new TwitterFollowingProvider(config);
 
-    StreamsJacksonMapperConfiguration mapperConfiguration = new 
StreamsJacksonMapperConfiguration()
-        
.withDateFormats(Collections.singletonList(TwitterDateTimeFormat.TWITTER_FORMAT));
-    ObjectMapper mapper = 
StreamsJacksonMapper.getInstance(mapperConfiguration);
+    Iterator<Follow> results = provider.call();
 
-    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
-    provider.prepare(config);
-    provider.startStream();
-    do {
-      Thread.sleep(streamsConfiguration.getBatchFrequencyMs());
-      for (StreamsDatum datum : provider.readCurrent()) {
-        String json;
-        try {
-          json = mapper.writeValueAsString(datum.getDocument());
-          outStream.println(json);
-        } catch (JsonProcessingException ex) {
-          System.err.println(ex.getMessage());
-        }
+    results.forEachRemaining(d -> {
+      try {
+        outStream.println(mapper.writeValueAsString(d));
+      } catch( Exception e ) {
+        LOGGER.warn("Exception", e);
       }
-    }
-    while ( provider.isRunning());
-    provider.cleanUp();
+    });
+
     outStream.flush();
   }
 
@@ -178,6 +182,8 @@ public String getId() {
 
   public void prepare(Object configurationObject) {
 
+    this.streamsConfiguration = StreamsConfigurator.detectConfiguration();
+
     if( configurationObject instanceof TwitterFollowingConfiguration) {
       this.config = (TwitterFollowingConfiguration) configurationObject;
     }
@@ -203,7 +209,7 @@ public void prepare(Object configurationObject) {
 
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -228,7 +234,7 @@ public void prepare(Object configurationObject) {
     Objects.requireNonNull(getConfig().getEndpoint());
 
     executor = MoreExecutors.listeningDecorator(
-        TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -237,12 +243,18 @@ public void prepare(Object configurationObject) {
     Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || 
getConfig().getEndpoint().equals("followers"));
 
     for (Long id : ids) {
-      submitTask(createTask(id, null));
+      Callable<Object> callable = createTask(id, null);
+      LOGGER.info("Thread Created: {}", id);
+      tasks.add(callable);
+      futures.add(executor.submit(callable));
       LOGGER.info("Thread Submitted: {}", id);
     }
 
     for (String name : names) {
-      submitTask(createTask(null, name));
+      Callable<Object> callable = createTask(null, name);
+      LOGGER.info("Thread Created: {}", name);
+      tasks.add(callable);
+      futures.add(executor.submit(callable));
       LOGGER.info("Thread Submitted: {}", name);
     }
 
@@ -256,45 +268,42 @@ public void startStream() {
 
     running.set(true);
 
-    LOGGER.info("isRunning");
+    LOGGER.info("running: {}", running.get());
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
 
-    executor.shutdown();
+    LOGGER.info("running: {}", running.get());
 
   }
 
-  protected Runnable createTask(Long id, String name) {
+  protected Callable createTask(Long id, String name) {
     if( config.getEndpoint().equals("friends") && config.getIdsOnly() == true 
) {
       FriendsIdsRequest request = (FriendsIdsRequest)new 
FriendsIdsRequest().withId(id).withScreenName(name);
       return new TwitterFriendsIdsProviderTask(
-              this,
-              client,
-              request);
+        this,
+        client,
+        request);
     } else if( config.getEndpoint().equals("friends") && config.getIdsOnly() 
== false ) {
       FriendsListRequest request = (FriendsListRequest)new 
FriendsListRequest().withId(id).withScreenName(name);
       return new TwitterFriendsListProviderTask(
-          this,
-          client,
-          request);
+        this,
+        client,
+        request);
     } else if( config.getEndpoint().equals("followers") && config.getIdsOnly() 
== true ) {
       FollowersIdsRequest request = (FollowersIdsRequest)new 
FollowersIdsRequest().withId(id).withScreenName(name);
       return new TwitterFollowersIdsProviderTask(
-          this,
-          client,
-          request);
+        this,
+        client,
+        request);
     } else if( config.getEndpoint().equals("followers") && config.getIdsOnly() 
== false ) {
       FollowersListRequest request = (FollowersListRequest)new 
FollowersListRequest().withId(id).withScreenName(name);
       return new TwitterFollowersListProviderTask(
-          this,
-          client,
-          request);
+        this,
+        client,
+        request);
     } else return null;
   }
 
-  protected void submitTask(Runnable providerTask) {
-    ListenableFuture future = executor.submit(providerTask);
-    futures.add(future);
-  }
-
   protected Twitter getTwitterClient() throws InstantiationException {
     return Twitter.getInstance(config);
   }
@@ -306,8 +315,7 @@ public StreamsResultSet readCurrent() {
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
-      result.setCounter(new DatumStatusCounter());
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
       LOGGER.debug("readCurrent: {} Documents", result.size());
     } finally {
       lock.writeLock().unlock();
@@ -332,40 +340,41 @@ public boolean shouldContinuePulling(List<User> users) {
   }
 
   public boolean isRunning() {
-    if ( providerQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone() ) {
-      LOGGER.info("All Threads Completed");
+    LOGGER.debug("executor.isShutdown: {}", executor.isShutdown());
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    boolean allTasksComplete;
+    if( futures.size() > 0) {
+      allTasksComplete = true;
+      for(Future<?> future : futures){
+        allTasksComplete |= !future.isDone(); // check if future is done
+      }
+    } else {
+      allTasksComplete = false;
+    }
+    LOGGER.debug("allTasksComplete: {}", allTasksComplete);
+    boolean finished = allTasksComplete && tasks.size() > 0 && tasks.size() == 
futures.size() && executor.isShutdown() && executor.isTerminated();
+    LOGGER.debug("finished: {}", finished);
+    if ( finished ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
     return running.get();
   }
 
-  // abstract this out
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<>();
+  public void cleanUp() {
+    // cleanUp
   }
 
-  // abstract this out
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
+  @Override
+  public Iterator<Follow> call() {
+    prepare(config);
+    startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> 
(Follow)x.getDocument()).iterator();
   }
 
-  public void cleanUp() {
-    shutdownAndAwaitTermination(executor);
-  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
index 5a53123cb3..402e825dc5 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
@@ -32,10 +32,15 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+
 /**
  *  Retrieve friend or follower connections for a single user id.
  */
-public class TwitterFriendsIdsProviderTask implements Runnable {
+public class TwitterFriendsIdsProviderTask implements 
Callable<Iterator<FriendsIdsResponse>>, Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterFriendsIdsProviderTask.class);
 
@@ -44,6 +49,7 @@
   protected Twitter client;
   protected TwitterFollowingProvider provider;
   protected FriendsIdsRequest request;
+  protected List<FriendsIdsResponse> responseList;
 
   /**
    * TwitterFollowingProviderTask constructor.
@@ -62,6 +68,8 @@ public void run() {
 
     Preconditions.checkArgument(request.getId() != null || 
request.getScreenName() != null);
 
+    responseList = new ArrayList<>();
+
     LOGGER.info("Thread Starting: {}", request.toString());
 
     getFriendsIds(request);
@@ -81,6 +89,8 @@ private void getFriendsIds(FriendsIdsRequest request) {
 
       FriendsIdsResponse response = client.ids(request);
 
+      responseList.add(response);
+
       last_count = response.getIds().size();
 
       if (response.getIds().size() > 0) {
@@ -123,4 +133,9 @@ public boolean shouldContinuePulling(long cursor, int 
count, int page_count, int
             && page_count <= provider.getConfig().getMaxPages());
   }
 
+  @Override
+  public Iterator<FriendsIdsResponse> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
index e116f2f809..b2917e21cc 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
@@ -20,6 +20,8 @@
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.FollowersListResponse;
+import org.apache.streams.twitter.api.FriendsIdsResponse;
 import org.apache.streams.twitter.api.FriendsListRequest;
 import org.apache.streams.twitter.api.FriendsListResponse;
 import org.apache.streams.twitter.api.Twitter;
@@ -32,10 +34,15 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+
 /**
  *  Retrieve friend or follower connections for a single user id.
  */
-public class TwitterFriendsListProviderTask implements Runnable {
+public class TwitterFriendsListProviderTask implements 
Callable<Iterator<FriendsListResponse>>, Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterFriendsListProviderTask.class);
 
@@ -44,6 +51,7 @@
   protected Twitter client;
   protected TwitterFollowingProvider provider;
   protected FriendsListRequest request;
+  protected List<FriendsListResponse> responseList;
 
   /**
    * TwitterFollowingProviderTask constructor.
@@ -67,9 +75,13 @@ public void run() {
 
     Preconditions.checkArgument(request.getId() != null || 
request.getScreenName() != null);
 
+    responseList = new ArrayList<>();
+
+    LOGGER.info("Thread Starting: {}", request.toString());
+
     getFriendsList(request);
 
-    LOGGER.info(request.getId() != null ? request.getId().toString() : 
request.getScreenName() + " Thread Finished");
+    LOGGER.info("Thread Finished: {}", request.toString());
 
   }
 
@@ -79,6 +91,8 @@ private void getFriendsList(FriendsListRequest request) {
 
       FriendsListResponse response = client.list(request);
 
+      responseList.add(response);
+
       last_count = response.getUsers().size();
 
       if (response.getUsers().size() > 0) {
@@ -121,4 +135,9 @@ public boolean shouldContinuePulling(long cursor, int 
count, int page_count, int
             && page_count <= provider.getConfig().getMaxPages());
   }
 
+  @Override
+  public Iterator<FriendsListResponse> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java
index acbe008157..b1b39b1258 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java
@@ -32,14 +32,16 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
  *  Retrieve recent posts for a single user id.
  */
-public class TwitterRetweetsTask implements Runnable {
+public class TwitterRetweetsTask implements Callable<Iterator<Tweet>>, 
Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterRetweetsTask.class);
 
@@ -76,4 +78,9 @@ public void run() {
 
   }
 
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    run();
+    return provider.providerQueue.stream().map(x -> 
(Tweet)x.getDocument()).iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index d9a0759fb0..5512d358d0 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -25,11 +25,14 @@
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.config.TwitterTimelineProviderConfiguration;
 import org.apache.streams.twitter.api.StatusesUserTimelineRequest;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -54,10 +57,13 @@
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -71,7 +77,7 @@
 /**
  * Retrieve recent posts from a list of user ids or names.
  */
-public class TwitterTimelineProvider implements StreamsProvider, Serializable {
+public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, 
StreamsProvider, Serializable {
 
   private static final String STREAMS_ID = "TwitterTimelineProvider";
 
@@ -95,14 +101,17 @@ public TwitterTimelineProviderConfiguration getConfig() {
   protected int idsCount;
   protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  protected ExecutorService executor;
 
   protected DateTime start;
   protected DateTime end;
 
+  StreamsConfiguration streamsConfiguration;
+
   protected final AtomicBoolean running = new AtomicBoolean();
 
-  protected List<ListenableFuture<Object>> futures = new ArrayList<>();
+  private List<Callable<Object>> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
 
   /**
    * To use from command line:
@@ -193,9 +202,11 @@ public void prepare(Object configurationObject) {
       this.config = (TwitterTimelineProviderConfiguration)configurationObject;
     }
 
+    streamsConfiguration = 
StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig());
+
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -233,7 +244,7 @@ public void prepare(Object configurationObject) {
     }
 
     executor = MoreExecutors.listeningDecorator(
-        TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -241,16 +252,25 @@ public void prepare(Object configurationObject) {
 
     submitTimelineThreads(ids, names);
 
+    LOGGER.info("tasks: {}", tasks.size());
+    LOGGER.info("futures: {}", futures.size());
+
   }
 
   @Override
   public void startStream() {
 
-    LOGGER.debug("{} startStream", STREAMS_ID);
+    Objects.requireNonNull(executor);
+
+    LOGGER.info("startStream");
 
     running.set(true);
 
-    executor.shutdown();
+    LOGGER.info("running: {}", running.get());
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+
+    LOGGER.info("running: {}", running.get());
 
   }
 
@@ -260,27 +280,31 @@ protected void submitTimelineThreads(List<Long> ids, 
List<String> names) {
       StatusesUserTimelineRequest request = new StatusesUserTimelineRequest();
       request.setUserId(id);
       request.setCount(config.getPageSize());
-      TwitterTimelineProviderTask providerTask = new 
TwitterTimelineProviderTask(
+      Callable providerTask = new TwitterTimelineProviderTask(
           this,
           client,
           request
       );
-      ListenableFuture future = executor.submit(providerTask);
+      LOGGER.info("Thread Created: {}", request);
+      tasks.add(providerTask);
+      Future future = executor.submit(providerTask);
       futures.add(future);
-      LOGGER.info("Thread Submitted: {}", providerTask.request);
+      LOGGER.info("Thread Submitted: {}", request);
     }
     for (String name : names) {
       StatusesUserTimelineRequest request = new StatusesUserTimelineRequest();
       request.setScreenName(name);
       request.setCount(config.getPageSize());
-      TwitterTimelineProviderTask providerTask = new 
TwitterTimelineProviderTask(
+      Callable providerTask = new TwitterTimelineProviderTask(
           this,
           client,
           request
       );
-      ListenableFuture future = executor.submit(providerTask);
+      LOGGER.info("Thread Created: {}", request);
+      tasks.add(providerTask);
+      Future future = executor.submit(providerTask);
       futures.add(future);
-      LOGGER.info("Thread Submitted: {}", providerTask.request);
+      LOGGER.info("Thread Submitted: {}", request);
     }
   }
 
@@ -294,8 +318,7 @@ public StreamsResultSet readCurrent() {
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
-      result.setCounter(new DatumStatusCounter());
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -312,10 +335,6 @@ public StreamsResultSet readCurrent() {
 
   }
 
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<StreamsDatum>();
-  }
-
   public StreamsResultSet readNew(BigInteger sequence) {
     LOGGER.debug("{} readNew", STREAMS_ID);
     throw new NotImplementedException();
@@ -338,35 +357,31 @@ public Twitter getTwitterClient() throws 
InstantiationException {
 
   @Override
   public void cleanUp() {
-    shutdownAndAwaitTermination(executor);
-  }
-
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
   }
 
   @Override
   public boolean isRunning() {
-    if (providerQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
+    LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty());
+    LOGGER.debug("providerQueue.size: {}", providerQueue.size());
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    if ( tasks.size() > 0 && tasks.size() == futures.size() && 
executor.isShutdown() && executor.isTerminated() ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
+    LOGGER.debug("isRunning: ", running.get());
     return running.get();
   }
+
+  @Override
+  public Iterator<Tweet> call() {
+    prepare(config);
+    startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> (Tweet)x.getDocument()).iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index f8816e0e6f..6bc9822393 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -30,14 +30,17 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
  *  Retrieve recent posts for a single user id.
  */
-public class TwitterTimelineProviderTask implements Runnable {
+public class TwitterTimelineProviderTask implements Callable<Iterator<Tweet>>, 
Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterTimelineProviderTask.class);
 
@@ -46,6 +49,7 @@
   protected TwitterTimelineProvider provider;
   protected Twitter client;
   protected StatusesUserTimelineRequest request;
+  protected List<Tweet> responseList;
 
   /**
    * TwitterTimelineProviderTask constructor.
@@ -68,10 +72,14 @@ public void run() {
 
     LOGGER.info("Thread Starting: {}", request.toString());
 
+    responseList = new ArrayList<>();
+
     do {
 
       List<Tweet> statuses = client.userTimeline(request);
 
+      responseList.addAll(statuses);
+
       last_count = statuses.size();
       if( statuses.size() > 0 ) {
 
@@ -106,5 +114,9 @@ public boolean shouldContinuePulling(int count, int 
page_count, int item_count)
   }
 
 
-
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 90cd23d5e3..5e9970a19b 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -25,12 +25,15 @@
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.config.TwitterFollowingConfiguration;
 import org.apache.streams.twitter.config.TwitterUserInformationConfiguration;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.api.UsersLookupRequest;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.User;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -56,10 +59,12 @@
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -73,7 +78,7 @@
 /**
  * Retrieve current profile status from a list of user ids or names.
  */
-public class TwitterUserInformationProvider implements StreamsProvider, 
Serializable {
+public class TwitterUserInformationProvider implements 
Callable<Iterator<User>>, StreamsProvider, Serializable {
 
   private static final String STREAMS_ID = "TwitterUserInformationProvider";
 
@@ -92,6 +97,8 @@
 
   protected volatile Queue<StreamsDatum> providerQueue;
 
+  StreamsConfiguration streamsConfiguration;
+
   public TwitterUserInformationConfiguration getConfig() {
     return config;
   }
@@ -171,11 +178,7 @@ public static void main(String[] args) throws Exception {
   }
 
   // TODO: this should be abstracted out
-  public static ExecutorService newFixedThreadPoolWithQueueSize(int 
numThreads, int queueSize) {
-    return new ThreadPoolExecutor(numThreads, numThreads,
-        5000L, TimeUnit.MILLISECONDS,
-        new ArrayBlockingQueue<>(queueSize, true), new 
ThreadPoolExecutor.CallerRunsPolicy());
-  }
+
 
   /**
    * TwitterUserInformationProvider constructor.
@@ -214,7 +217,7 @@ public void prepare(Object configurationObject) {
     Objects.requireNonNull(config.getInfo());
     Objects.requireNonNull(config.getThreadsPerProvider());
 
-    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration();
+    streamsConfiguration = StreamsConfigurator.detectConfiguration();
 
     Objects.requireNonNull(streamsConfiguration.getQueueSize());
 
@@ -228,7 +231,7 @@ public void prepare(Object configurationObject) {
 
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -250,7 +253,7 @@ public void prepare(Object configurationObject) {
     }
 
     executor = MoreExecutors.listeningDecorator(
-        TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+        ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -332,8 +335,7 @@ public StreamsResultSet readCurrent() {
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
-      result.setCounter(new DatumStatusCounter());
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
       LOGGER.debug("readCurrent: {} Documents", result.size());
     } finally {
       lock.writeLock().unlock();
@@ -343,10 +345,6 @@ public StreamsResultSet readCurrent() {
 
   }
 
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<>();
-  }
-
   public StreamsResultSet readNew(BigInteger sequence) {
     LOGGER.debug("{} readNew", STREAMS_ID);
     throw new NotImplementedException();
@@ -372,31 +370,23 @@ public boolean isRunning() {
     return running.get();
   }
 
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
-  }
-
   protected Twitter getTwitterClient() throws InstantiationException {
     return Twitter.getInstance(config);
   }
 
   @Override
   public void cleanUp() {
-    shutdownAndAwaitTermination(executor);
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+  }
+
+  @Override
+  public Iterator<User> call() throws Exception {
+    prepare(config);
+    startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> (User)x.getDocument()).iterator();
   }
 }
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/util/ExecutorUtils.java 
b/streams-core/src/main/java/org/apache/streams/core/util/ExecutorUtils.java
new file mode 100644
index 0000000000..4fe29c520e
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/util/ExecutorUtils.java
@@ -0,0 +1,36 @@
+package org.apache.streams.core.util;
+
+import org.apache.streams.config.StreamsConfigurator;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class ExecutorUtils {
+
+  public static ExecutorService newFixedThreadPoolWithQueueSize(int 
numThreads, int queueSize) {
+    return new ThreadPoolExecutor(numThreads, numThreads,
+      5000L, TimeUnit.MILLISECONDS,
+      new ArrayBlockingQueue<>(queueSize, true), new 
ThreadPoolExecutor.CallerRunsPolicy());
+  }
+
+  public static void shutdownAndAwaitTermination(ExecutorService pool) {
+    pool.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if 
(!pool.awaitTermination(StreamsConfigurator.detectConfiguration().getProviderTimeoutMs(),
 TimeUnit.SECONDS)) {
+        pool.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if 
(!pool.awaitTermination(StreamsConfigurator.detectConfiguration().getProviderWaitMs(),
 TimeUnit.SECONDS)) {
+          System.err.println("Pool did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      pool.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
+}
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/util/QueueUtils.java 
b/streams-core/src/main/java/org/apache/streams/core/util/QueueUtils.java
new file mode 100644
index 0000000000..bbb7ea7c54
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/util/QueueUtils.java
@@ -0,0 +1,14 @@
+package org.apache.streams.core.util;
+
+import org.apache.streams.core.StreamsDatum;
+
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class QueueUtils {
+
+  public static Queue<StreamsDatum> constructQueue() {
+    return new LinkedBlockingQueue<>();
+  }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement Callable interface on a few twitter providers
> -------------------------------------------------------
>
>                 Key: STREAMS-605
>                 URL: https://issues.apache.org/jira/browse/STREAMS-605
>             Project: Streams
>          Issue Type: Improvement
>            Reporter: Steve Blackmon
>            Assignee: Steve Blackmon
>            Priority: Major
>
> When interfacing with a Provider directly, we wind up needing a lot of 
> boilerplate to get the resulting data.
> It would be useful to be able to directly access a full resultset produced 
> with readAll as underlying document objects (not datums) just by submitting a 
> provider to a java executorservice.
> Also this would allow for a provider to be run with a timeout chosen at 
> run-time.
> Implement Callable interface on a few twitter providers as a proof of 
> concept.  Shouldn’t need to interfere with the other interfaces.  If the 
> community likes this approach we can roll it out more broadly and perhaps add 
> Callable to the StreamsProvider interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to