This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a commit to branch STREAMS-605
in repository https://gitbox.apache.org/repos/asf/streams.git

commit 66508b0cb3ff198b58a43cc90eeb1299bbcafb57
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
AuthorDate: Mon Jun 11 11:16:25 2018 -0500

    resolves STREAMS-605
---
 .../twitter/provider/SevenDaySearchProvider.java   | 119 +++++++-------
 .../provider/SevenDaySearchProviderTask.java       |  13 +-
 .../twitter/provider/ThirtyDaySearchProvider.java  | 121 +++++++-------
 .../provider/ThirtyDaySearchProviderTask.java      |  14 +-
 .../twitter/provider/TwitterEngagersProvider.java  | 175 +++++++++++----------
 .../provider/TwitterFollowersIdsProviderTask.java  |  18 ++-
 .../provider/TwitterFollowersListProviderTask.java |  20 ++-
 .../twitter/provider/TwitterFollowingProvider.java | 149 +++++++++---------
 .../provider/TwitterFriendsIdsProviderTask.java    |  17 +-
 .../provider/TwitterFriendsListProviderTask.java   |  23 ++-
 .../twitter/provider/TwitterRetweetsTask.java      |   9 +-
 .../twitter/provider/TwitterTimelineProvider.java  |  99 +++++++-----
 .../provider/TwitterTimelineProviderTask.java      |  16 +-
 .../provider/TwitterUserInformationProvider.java   |  60 +++----
 14 files changed, 489 insertions(+), 364 deletions(-)

diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
index 10a2661..2c76a5f 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
@@ -25,11 +25,15 @@ import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.StreamsJacksonMapperConfiguration;
 import org.apache.streams.twitter.config.SevenDaySearchProviderConfiguration;
 import org.apache.streams.twitter.api.SevenDaySearchRequest;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -54,10 +58,13 @@ import java.io.PrintStream;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,7 +76,7 @@ import java.util.stream.Stream;
 /**
  * Retrieve recent posts from a list of user ids or names.
  */
-public class SevenDaySearchProvider implements StreamsProvider, Serializable {
+public class SevenDaySearchProvider implements Callable<Iterator<Tweet>>, 
StreamsProvider, Serializable {
 
   private static final String STREAMS_ID = "SevenDaySearchProvider";
 
@@ -91,11 +98,14 @@ public class SevenDaySearchProvider implements 
StreamsProvider, Serializable {
 
   protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  protected ExecutorService executor;
 
-  protected final AtomicBoolean running = new AtomicBoolean();
+  StreamsConfiguration streamsConfiguration;
+
+  private List<Callable<Object>> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
 
-  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+  protected final AtomicBoolean running = new AtomicBoolean();
 
   /**
    * To use from command line:
@@ -136,26 +146,21 @@ public class SevenDaySearchProvider implements 
StreamsProvider, Serializable {
     SevenDaySearchProviderConfiguration config = new 
ComponentConfigurator<>(SevenDaySearchProviderConfiguration.class).detectConfiguration();
     SevenDaySearchProvider provider = new SevenDaySearchProvider(config);
 
-    ObjectMapper mapper = new 
StreamsJacksonMapper(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()));
-
     PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
-    provider.prepare(config);
-    provider.startStream();
-    do {
-      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-      for (StreamsDatum datum : provider.readCurrent()) {
-        String json;
-        try {
-          json = mapper.writeValueAsString(datum.getDocument());
-          outStream.println(json);
-        } catch (JsonProcessingException ex) {
-          System.err.println(ex.getMessage());
-        }
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(new 
StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())));
+
+    Iterator<Tweet> results = provider.call();
+
+    results.forEachRemaining(d -> {
+      try {
+        outStream.println(mapper.writeValueAsString(d));
+      } catch( Exception e ) {
+        LOGGER.warn("Exception", e);
       }
-    }
-    while ( provider.isRunning() );
-    provider.cleanUp();
+    });
+
     outStream.flush();
+
   }
 
   public SevenDaySearchProvider() {
@@ -184,7 +189,7 @@ public class SevenDaySearchProvider implements 
StreamsProvider, Serializable {
 
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -200,7 +205,7 @@ public class SevenDaySearchProvider implements 
StreamsProvider, Serializable {
     request = new SevenDaySearchRequest();
     request.setQ(config.getQ());
 
-    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration();
+    streamsConfiguration = StreamsConfigurator.detectConfiguration();
 
     try {
       client = getTwitterClient();
@@ -211,7 +216,7 @@ public class SevenDaySearchProvider implements 
StreamsProvider, Serializable {
     Objects.requireNonNull(client);
 
     executor = MoreExecutors.listeningDecorator(
-        TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -224,24 +229,32 @@ public class SevenDaySearchProvider implements 
StreamsProvider, Serializable {
   @Override
   public void startStream() {
 
-    LOGGER.debug("{} startStream", STREAMS_ID);
+    Objects.requireNonNull(executor);
+
+    LOGGER.info("startStream");
 
     running.set(true);
 
-    executor.shutdown();
+    LOGGER.info("running: {}", running.get());
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+
+    LOGGER.info("running: {}", running.get());
 
   }
 
   protected void submitSearchThread() {
 
-      SevenDaySearchProviderTask providerTask = new SevenDaySearchProviderTask(
+      Callable providerTask = new SevenDaySearchProviderTask(
           this,
           client,
         request
       );
-      ListenableFuture future = executor.submit(providerTask);
+      LOGGER.info("Thread Created: {}", request);
+      tasks.add(providerTask);
+      Future future = executor.submit(providerTask);
       futures.add(future);
-      LOGGER.info("Thread Submitted: {}", providerTask.request);
+      LOGGER.info("Thread Submitted: {}", request);
 
   }
 
@@ -255,8 +268,7 @@ public class SevenDaySearchProvider implements 
StreamsProvider, Serializable {
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
-      result.setCounter(new DatumStatusCounter());
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -273,10 +285,6 @@ public class SevenDaySearchProvider implements 
StreamsProvider, Serializable {
 
   }
 
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<StreamsDatum>();
-  }
-
   public StreamsResultSet readNew(BigInteger sequence) {
     LOGGER.debug("{} readNew", STREAMS_ID);
     throw new NotImplementedException();
@@ -299,35 +307,30 @@ public class SevenDaySearchProvider implements 
StreamsProvider, Serializable {
 
   @Override
   public void cleanUp() {
-    shutdownAndAwaitTermination(executor);
-  }
-
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
   }
 
   @Override
   public boolean isRunning() {
-    if (providerQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    if ( tasks.size() > 0 && tasks.size() == futures.size() && 
executor.isTerminated() ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
+    LOGGER.debug("isRunning: {}", running.get());
     return running.get();
   }
+
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    prepare(config);
+    startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> 
((Tweet)x.getDocument())).distinct().iterator();
+
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
index 8bb0b0f..ac094dc 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
@@ -32,14 +32,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
  *  Retrieve recent posts for a single user id.
  */
-public class SevenDaySearchProviderTask implements Runnable {
+public class SevenDaySearchProviderTask implements Callable<Iterator<Tweet>>, 
Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SevenDaySearchProviderTask.class);
 
@@ -48,6 +50,7 @@ public class SevenDaySearchProviderTask implements Runnable {
   protected SevenDaySearchProvider provider;
   protected Twitter client;
   protected SevenDaySearchRequest request;
+  protected List<Tweet> responseList;
 
   /**
    * SevenDaySearchProviderTask constructor.
@@ -76,6 +79,8 @@ public class SevenDaySearchProviderTask implements Runnable {
 
       List<Tweet> statuses = response.getStatuses();
 
+      responseList.addAll(statuses);
+
       last_count = statuses.size();
       if( statuses.size() > 0 ) {
 
@@ -109,6 +114,10 @@ public class SevenDaySearchProviderTask implements 
Runnable {
             && page_count <= provider.getConfig().getMaxPages());
   }
 
-
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
index 080a30d..287ab4f 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
@@ -25,11 +25,15 @@ import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.StreamsJacksonMapperConfiguration;
 import org.apache.streams.twitter.config.ThirtyDaySearchProviderConfiguration;
 import org.apache.streams.twitter.api.ThirtyDaySearchRequest;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -54,10 +58,13 @@ import java.io.PrintStream;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,7 +76,7 @@ import java.util.stream.Stream;
 /**
  * Retrieve recent posts from a list of user ids or names.
  */
-public class ThirtyDaySearchProvider implements StreamsProvider, Serializable {
+public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, 
StreamsProvider, Serializable {
 
   private static final String STREAMS_ID = "ThirtyDaySearchProvider";
 
@@ -91,11 +98,14 @@ public class ThirtyDaySearchProvider implements 
StreamsProvider, Serializable {
 
   protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  protected ExecutorService executor;
 
-  protected final AtomicBoolean running = new AtomicBoolean();
+  private List<Callable<Object>> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
+
+  StreamsConfiguration streamsConfiguration;
 
-  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+  protected final AtomicBoolean running = new AtomicBoolean();
 
   /**
    * To use from command line:
@@ -137,26 +147,21 @@ public class ThirtyDaySearchProvider implements 
StreamsProvider, Serializable {
     ThirtyDaySearchProviderConfiguration config = new 
ComponentConfigurator<>(ThirtyDaySearchProviderConfiguration.class).detectConfiguration();
     ThirtyDaySearchProvider provider = new ThirtyDaySearchProvider(config);
 
-    ObjectMapper mapper = new 
StreamsJacksonMapper(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()));
-
     PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
-    provider.prepare(config);
-    provider.startStream();
-    do {
-      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-      for (StreamsDatum datum : provider.readCurrent()) {
-        String json;
-        try {
-          json = mapper.writeValueAsString(datum.getDocument());
-          outStream.println(json);
-        } catch (JsonProcessingException ex) {
-          System.err.println(ex.getMessage());
-        }
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(new 
StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())));
+
+    Iterator<Tweet> results = provider.call();
+
+    results.forEachRemaining(d -> {
+      try {
+        outStream.println(mapper.writeValueAsString(d));
+      } catch( Exception e ) {
+        LOGGER.warn("Exception", e);
       }
-    }
-    while ( provider.isRunning() );
-    provider.cleanUp();
+    });
+
     outStream.flush();
+
   }
 
   public ThirtyDaySearchProvider(ThirtyDaySearchProviderConfiguration config) {
@@ -181,7 +186,7 @@ public class ThirtyDaySearchProvider implements 
StreamsProvider, Serializable {
 
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -197,7 +202,7 @@ public class ThirtyDaySearchProvider implements 
StreamsProvider, Serializable {
     request = new ThirtyDaySearchRequest();
     request.setQuery(config.getQuery());
 
-    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration();
+    streamsConfiguration = StreamsConfigurator.detectConfiguration();
 
     try {
       client = getTwitterClient();
@@ -208,7 +213,7 @@ public class ThirtyDaySearchProvider implements 
StreamsProvider, Serializable {
     Objects.requireNonNull(client);
 
     executor = MoreExecutors.listeningDecorator(
-        TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -221,25 +226,32 @@ public class ThirtyDaySearchProvider implements 
StreamsProvider, Serializable {
   @Override
   public void startStream() {
 
-    LOGGER.debug("{} startStream", STREAMS_ID);
+    Objects.requireNonNull(executor);
+
+    LOGGER.info("startStream");
 
     running.set(true);
 
-    executor.shutdown();
+    LOGGER.info("running: {}", running.get());
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+
+    LOGGER.info("running: {}", running.get());
 
   }
 
   protected void submitSearchThread() {
 
-    ThirtyDaySearchProviderTask providerTask = new ThirtyDaySearchProviderTask(
+    Callable providerTask = new ThirtyDaySearchProviderTask(
           this,
           client,
         request
       );
-      ListenableFuture future = executor.submit(providerTask);
-      futures.add(future);
-      LOGGER.info("Thread Submitted: {}", providerTask.request);
-
+    LOGGER.info("Thread Created: {}", request);
+    tasks.add(providerTask);
+    Future future = executor.submit(providerTask);
+    futures.add(future);
+    LOGGER.info("Thread Submitted: {}", request);
   }
 
   @Override
@@ -252,8 +264,7 @@ public class ThirtyDaySearchProvider implements 
StreamsProvider, Serializable {
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
-      result.setCounter(new DatumStatusCounter());
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -270,10 +281,6 @@ public class ThirtyDaySearchProvider implements 
StreamsProvider, Serializable {
 
   }
 
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<StreamsDatum>();
-  }
-
   public StreamsResultSet readNew(BigInteger sequence) {
     LOGGER.debug("{} readNew", STREAMS_ID);
     throw new NotImplementedException();
@@ -296,35 +303,29 @@ public class ThirtyDaySearchProvider implements 
StreamsProvider, Serializable {
 
   @Override
   public void cleanUp() {
-    shutdownAndAwaitTermination(executor);
-  }
-
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
   }
 
   @Override
   public boolean isRunning() {
-    if (providerQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    if ( tasks.size() > 0 && tasks.size() == futures.size() && 
executor.isShutdown() && executor.isTerminated() ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
+    LOGGER.debug("isRunning: {}", running.get());
     return running.get();
   }
+
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    prepare(config);
+    startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> 
((Tweet)x.getDocument())).iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
index 07b40e5..dea770b 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
@@ -31,14 +31,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
  *  Retrieve recent posts for a single user id.
  */
-public class ThirtyDaySearchProviderTask implements Runnable {
+public class ThirtyDaySearchProviderTask implements Callable<Iterator<Tweet>>, 
Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ThirtyDaySearchProviderTask.class);
 
@@ -47,6 +49,7 @@ public class ThirtyDaySearchProviderTask implements Runnable {
   protected ThirtyDaySearchProvider provider;
   protected Twitter client;
   protected ThirtyDaySearchRequest request;
+  protected List<Tweet> responseList;
 
   /**
    * ThirtyDaySearchProviderTask constructor.
@@ -75,6 +78,8 @@ public class ThirtyDaySearchProviderTask implements Runnable {
 
       List<Tweet> statuses = response.getResults();
 
+      responseList.addAll(statuses);
+
       last_count = statuses.size();
       if( statuses.size() > 0 ) {
 
@@ -107,6 +112,9 @@ public class ThirtyDaySearchProviderTask implements 
Runnable {
         && page_count <= provider.getConfig().getMaxPages());
   }
 
-
-
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java
index bbb2881..e244f50 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java
@@ -24,8 +24,11 @@ import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.StreamsJacksonMapperConfiguration;
+import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.config.TwitterEngagersProviderConfiguration;
 import org.apache.streams.twitter.config.TwitterTimelineProviderConfiguration;
 import org.apache.streams.twitter.api.RetweetsRequest;
@@ -36,6 +39,7 @@ import org.apache.streams.twitter.pojo.User;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -58,8 +62,11 @@ import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -73,7 +80,7 @@ import static 
org.apache.streams.twitter.provider.TwitterUserInformationProvider
 /**
  * Retrieve posts from a list of user ids or names, then provide all of the 
users who retweeted those posts.
  */
-public class TwitterEngagersProvider extends TwitterTimelineProvider 
implements StreamsProvider, Serializable {
+public class TwitterEngagersProvider implements Callable<Iterator<User>>, 
StreamsProvider, Serializable {
 
   private static final String STREAMS_ID = "TwitterEngagersProvider";
 
@@ -91,14 +98,19 @@ public class TwitterEngagersProvider extends 
TwitterTimelineProvider implements
 
   protected final AtomicBoolean running = new AtomicBoolean();
 
-  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+  protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  private List<Callable<Object>> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
+
+  public static ExecutorService executor;
 
   StreamsConfiguration streamsConfiguration;
 
   RetweetsRequest baseRetweetsRequest;
 
+  TwitterTimelineProvider timelineProvider;
+
   /**
    * To use from command line:
    *
@@ -134,29 +146,22 @@ public class TwitterEngagersProvider extends 
TwitterTimelineProvider implements
     Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, 
ConfigParseOptions.defaults().setAllowMissing(false)).withFallback(StreamsConfigurator.getConfig());
     StreamsConfigurator.addConfig(testResourceConfig);
 
-    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration();
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(new 
StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())));
+
     TwitterEngagersProviderConfiguration config = new 
ComponentConfigurator<>(TwitterEngagersProviderConfiguration.class).detectConfiguration();
     TwitterEngagersProvider provider = new TwitterEngagersProvider(config);
 
-    ObjectMapper mapper = StreamsJacksonMapper.getInstance(new 
StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())));
+    Iterator<User> results = provider.call();
 
-    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
-    provider.prepare(config);
-    provider.startStream();
-    do {
-      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-      for (StreamsDatum datum : provider.readCurrent()) {
-        String json;
-        try {
-          json = mapper.writeValueAsString(datum.getDocument());
-          outStream.println(json);
-        } catch (JsonProcessingException ex) {
-          System.err.println(ex.getMessage());
-        }
+    results.forEachRemaining(d -> {
+      try {
+        outStream.println(mapper.writeValueAsString(d));
+      } catch( Exception e ) {
+        LOGGER.warn("Exception", e);
       }
-    }
-    while ( provider.isRunning() );
-    provider.cleanUp();
+    });
+
     outStream.flush();
   }
 
@@ -176,27 +181,37 @@ public class TwitterEngagersProvider extends 
TwitterTimelineProvider implements
   @Override
   public void prepare(Object configurationObject) {
 
+    timelineProvider = new TwitterTimelineProvider(config);
+
     if( configurationObject instanceof TwitterEngagersProviderConfiguration ) {
       this.config = (TwitterEngagersProviderConfiguration)configurationObject;
-      super.prepare(MAPPER.convertValue(this.config, 
TwitterTimelineProviderConfiguration.class));
+      timelineProvider.prepare(MAPPER.convertValue(this.config, 
TwitterTimelineProviderConfiguration.class));
     } else if( configurationObject instanceof 
TwitterTimelineProviderConfiguration ) {
-      super.prepare(configurationObject);
+      timelineProvider.prepare(configurationObject);
       this.config = MAPPER.convertValue(this.config, 
TwitterEngagersProviderConfiguration.class);
     } else {
-      super.prepare(null);
+      timelineProvider.prepare(null);
     }
 
     streamsConfiguration = 
StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig());
 
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
 
+    try {
+      client = getTwitterClient();
+    } catch (InstantiationException e) {
+      LOGGER.error("InstantiationException", e);
+    }
+
+    Objects.requireNonNull(client);
+
     executor = MoreExecutors.listeningDecorator(
-      TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
         config.getThreadsPerProvider().intValue(),
         streamsConfiguration.getQueueSize().intValue()
       )
@@ -206,41 +221,50 @@ public class TwitterEngagersProvider extends 
TwitterTimelineProvider implements
 
   }
 
+  /**
+   * get Twitter Client from TwitterUserInformationConfiguration.
+   * @return result
+   */
+  public Twitter getTwitterClient() throws InstantiationException {
+
+    return Twitter.getInstance(config);
+
+  }
+
   @Override
   public void startStream() {
 
     LOGGER.debug("{} startStream", STREAMS_ID);
 
-    super.startStream();
+    Iterator<Tweet> timelineIterator = timelineProvider.call();
 
-    do {
-      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-      Iterator<StreamsDatum> iterator = super.readCurrent().iterator();
-      while (iterator.hasNext()) {
-        StreamsDatum datum = iterator.next();
-        Tweet tweet = (Tweet) datum.getDocument();
-        submitRetweeterIdsTaskThread(tweet.getId());
-      }
-    }
-    while ( super.isRunning() );
-    super.cleanUp();
-    executor.shutdown();
+    List<Tweet> timelineList = Lists.newArrayList(timelineIterator);
+
+    LOGGER.info("running: {}", running.get());
+
+    timelineList.forEach(tweet -> submitRetweeterIdsTaskThread(tweet.getId()));
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+
+    LOGGER.info("running: {}", running.get());
 
   }
 
   protected void submitRetweeterIdsTaskThread( Long postId ) {
 
+    Callable<Object> callable = createTask(postId);
+    LOGGER.info("Thread Created: {}", postId);
+    tasks.add(callable);
+    futures.add(executor.submit(callable));
+    LOGGER.info("Thread Submitted: {}", postId);
+
+  }
+
+  protected Callable createTask( Long postId ) {
     RetweetsRequest request = new 
ComponentConfigurator<>(RetweetsRequest.class).detectConfiguration();
     request.setId(postId);
-      TwitterRetweetsTask providerTask = new TwitterRetweetsTask(
-      this,
-      client,
-      request
-    );
-    ListenableFuture future = executor.submit(providerTask);
-    super.futures.add(future);
-    LOGGER.info("Thread Submitted: {}", providerTask.request);
-
+    Callable callable = new TwitterRetweetsTask(this, client, request);
+    return callable;
   }
 
   @Override
@@ -252,7 +276,7 @@ public class TwitterEngagersProvider extends 
TwitterTimelineProvider implements
 
     try {
       lock.writeLock().lock();
-      Queue<StreamsDatum> resultQueue = constructQueue();
+      Queue<StreamsDatum> resultQueue = QueueUtils.constructQueue();
       providerQueue.iterator().forEachRemaining(
         datum -> {
           Tweet tweet = ((Tweet) datum.getDocument());
@@ -266,12 +290,12 @@ public class TwitterEngagersProvider extends 
TwitterTimelineProvider implements
         }
       );
       result = new StreamsResultSet(resultQueue);
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
 
-    if ( result.size() == 0 && providerQueue.isEmpty() && 
executor.isTerminated() ) {
+    if ( result.size() == 0 && providerQueue.isEmpty() && 
executor.isShutdown() && executor.isTerminated() ) {
       LOGGER.info("Finished.  Cleaning up...");
 
       running.set(false);
@@ -283,10 +307,6 @@ public class TwitterEngagersProvider extends 
TwitterTimelineProvider implements
 
   }
 
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<StreamsDatum>();
-  }
-
   public StreamsResultSet readNew(BigInteger sequence) {
     LOGGER.debug("{} readNew", STREAMS_ID);
     throw new NotImplementedException();
@@ -299,36 +319,31 @@ public class TwitterEngagersProvider extends 
TwitterTimelineProvider implements
 
   @Override
   public void cleanUp() {
-    super.cleanUp();
-    shutdownAndAwaitTermination(executor);
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
   }
 
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  @Override
   public boolean isRunning() {
-    if (providerQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
+    LOGGER.debug("timelineProvider.isRunning: {}", 
timelineProvider.isRunning());
+    LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty());
+    LOGGER.debug("providerQueue.size: {}", providerQueue.size());
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    if ( timelineProvider.isRunning() == false && tasks.size() > 0 && 
tasks.size() == futures.size() && executor.isTerminated() ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
+    LOGGER.debug("isRunning: {}", running.get());
     return running.get();
   }
+
+  @Override
+  public Iterator<User> call() {
+    prepare(config);
+    startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> 
((Tweet)x.getDocument()).getUser()).distinct().iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
index 7acbeef..0b2305a 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
@@ -22,6 +22,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.api.FollowersIdsRequest;
 import org.apache.streams.twitter.api.FollowersIdsResponse;
+import org.apache.streams.twitter.api.FriendsIdsResponse;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.pojo.Follow;
 import org.apache.streams.twitter.pojo.User;
@@ -32,10 +33,15 @@ import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+
 /**
  *  Retrieve friend or follower connections for a single user id.
  */
-public class TwitterFollowersIdsProviderTask implements Runnable {
+public class TwitterFollowersIdsProviderTask implements 
Callable<Iterator<FollowersIdsResponse>>, Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterFollowersIdsProviderTask.class);
 
@@ -44,6 +50,7 @@ public class TwitterFollowersIdsProviderTask implements 
Runnable {
   protected Twitter client;
   protected TwitterFollowingProvider provider;
   protected FollowersIdsRequest request;
+  protected List<FollowersIdsResponse> responseList;
 
   /**
    * TwitterFollowingProviderTask constructor.
@@ -62,6 +69,8 @@ public class TwitterFollowersIdsProviderTask implements 
Runnable {
 
     Preconditions.checkArgument(request.getId() != null || 
request.getScreenName() != null);
 
+    responseList = new ArrayList<>();
+
     LOGGER.info("Thread Starting: {}", request.toString());
 
     getFollowersIds(request);
@@ -81,6 +90,8 @@ public class TwitterFollowersIdsProviderTask implements 
Runnable {
 
       FollowersIdsResponse response = client.ids(request);
 
+      responseList.add(response);
+
       last_count = response.getIds().size();
 
       if (response.getIds().size() > 0) {
@@ -122,4 +133,9 @@ public class TwitterFollowersIdsProviderTask implements 
Runnable {
             && page_count <= provider.getConfig().getMaxPages());
   }
 
+  @Override
+  public Iterator<FollowersIdsResponse> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
index 5195a13..94ec455 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
@@ -28,13 +28,19 @@ import org.apache.streams.twitter.pojo.User;
 import org.apache.streams.util.ComponentUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+
 /**
  *  Retrieve friend or follower connections for a single user id.
  */
-public class TwitterFollowersListProviderTask implements Runnable {
+public class TwitterFollowersListProviderTask implements 
Callable<Iterator<FollowersListResponse>>, Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterFollowersListProviderTask.class);
 
@@ -43,6 +49,7 @@ public class TwitterFollowersListProviderTask implements 
Runnable {
   protected Twitter client;
   protected TwitterFollowingProvider provider;
   protected FollowersListRequest request;
+  protected List<FollowersListResponse> responseList;
 
   /**
    * TwitterFollowersListProviderTask constructor.
@@ -59,8 +66,12 @@ public class TwitterFollowersListProviderTask implements 
Runnable {
   @Override
   public void run() {
 
+    Preconditions.checkArgument(request.getId() != null || 
request.getScreenName() != null);
+
     LOGGER.info("Thread Starting: {}", request.toString());
 
+    responseList = new ArrayList<>();
+
     getFollowersList(request);
 
     LOGGER.info("Thread Finished: {}", request.toString());
@@ -78,6 +89,8 @@ public class TwitterFollowersListProviderTask implements 
Runnable {
 
       FollowersListResponse response = client.list(request);
 
+      responseList.add(response);
+
       last_count = response.getUsers().size();
 
       if (response.getUsers().size() > 0) {
@@ -118,4 +131,9 @@ public class TwitterFollowersListProviderTask implements 
Runnable {
             && page_count <= provider.getConfig().getMaxPages());
   }
 
+  @Override
+  public Iterator<FollowersListResponse> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 7e6949c..3b45cf3 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -25,6 +25,8 @@ import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.StreamsJacksonMapperConfiguration;
 import org.apache.streams.twitter.config.TwitterFollowingConfiguration;
@@ -34,6 +36,7 @@ import org.apache.streams.twitter.api.FriendsIdsRequest;
 import org.apache.streams.twitter.api.FriendsListRequest;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Follow;
 import org.apache.streams.twitter.pojo.User;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -43,6 +46,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
@@ -59,26 +63,34 @@ import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static sun.misc.PostVMInitHook.run;
 
 /**
  * Retrieve all follow adjacencies from a list of user ids or names.
  */
-public class TwitterFollowingProvider implements StreamsProvider, Serializable 
{
+public class TwitterFollowingProvider implements Callable<Iterator<Follow>>, 
StreamsProvider, Serializable {
 
   public static final String STREAMS_ID = "TwitterFollowingProvider";
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterFollowingProvider.class);
 
   private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
+  private StreamsConfiguration streamsConfiguration;
   private TwitterFollowingConfiguration config;
 
   protected List<String> names = new ArrayList<>();
@@ -86,9 +98,10 @@ public class TwitterFollowingProvider implements 
StreamsProvider, Serializable {
 
   protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  public static ExecutorService executor;
 
-  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+  private List<Callable<Object>> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
 
   protected final AtomicBoolean running = new AtomicBoolean();
 
@@ -129,31 +142,22 @@ public class TwitterFollowingProvider implements 
StreamsProvider, Serializable {
     Config configFile = ConfigFactory.parseFileAnySyntax(file, 
ConfigParseOptions.defaults());
     StreamsConfigurator.addConfig(configFile);
 
-    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration();
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(new 
StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())));
+
     TwitterFollowingConfiguration config = new 
ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration();
     TwitterFollowingProvider provider = new TwitterFollowingProvider(config);
 
-    StreamsJacksonMapperConfiguration mapperConfiguration = new 
StreamsJacksonMapperConfiguration()
-        
.withDateFormats(Collections.singletonList(TwitterDateTimeFormat.TWITTER_FORMAT));
-    ObjectMapper mapper = 
StreamsJacksonMapper.getInstance(mapperConfiguration);
+    Iterator<Follow> results = provider.call();
 
-    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
-    provider.prepare(config);
-    provider.startStream();
-    do {
-      Thread.sleep(streamsConfiguration.getBatchFrequencyMs());
-      for (StreamsDatum datum : provider.readCurrent()) {
-        String json;
-        try {
-          json = mapper.writeValueAsString(datum.getDocument());
-          outStream.println(json);
-        } catch (JsonProcessingException ex) {
-          System.err.println(ex.getMessage());
-        }
+    results.forEachRemaining(d -> {
+      try {
+        outStream.println(mapper.writeValueAsString(d));
+      } catch( Exception e ) {
+        LOGGER.warn("Exception", e);
       }
-    }
-    while ( provider.isRunning());
-    provider.cleanUp();
+    });
+
     outStream.flush();
   }
 
@@ -178,6 +182,8 @@ public class TwitterFollowingProvider implements 
StreamsProvider, Serializable {
 
   public void prepare(Object configurationObject) {
 
+    this.streamsConfiguration = StreamsConfigurator.detectConfiguration();
+
     if( configurationObject instanceof TwitterFollowingConfiguration) {
       this.config = (TwitterFollowingConfiguration) configurationObject;
     }
@@ -203,7 +209,7 @@ public class TwitterFollowingProvider implements 
StreamsProvider, Serializable {
 
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -228,7 +234,7 @@ public class TwitterFollowingProvider implements 
StreamsProvider, Serializable {
     Objects.requireNonNull(getConfig().getEndpoint());
 
     executor = MoreExecutors.listeningDecorator(
-        TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -237,12 +243,18 @@ public class TwitterFollowingProvider implements 
StreamsProvider, Serializable {
     Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || 
getConfig().getEndpoint().equals("followers"));
 
     for (Long id : ids) {
-      submitTask(createTask(id, null));
+      Callable<Object> callable = createTask(id, null);
+      LOGGER.info("Thread Created: {}", id);
+      tasks.add(callable);
+      futures.add(executor.submit(callable));
       LOGGER.info("Thread Submitted: {}", id);
     }
 
     for (String name : names) {
-      submitTask(createTask(null, name));
+      Callable<Object> callable = createTask(null, name);
+      LOGGER.info("Thread Created: {}", name);
+      tasks.add(callable);
+      futures.add(executor.submit(callable));
       LOGGER.info("Thread Submitted: {}", name);
     }
 
@@ -256,45 +268,42 @@ public class TwitterFollowingProvider implements 
StreamsProvider, Serializable {
 
     running.set(true);
 
-    LOGGER.info("isRunning");
+    LOGGER.info("running: {}", running.get());
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
 
-    executor.shutdown();
+    LOGGER.info("running: {}", running.get());
 
   }
 
-  protected Runnable createTask(Long id, String name) {
+  protected Callable createTask(Long id, String name) {
     if( config.getEndpoint().equals("friends") && config.getIdsOnly() == true 
) {
       FriendsIdsRequest request = (FriendsIdsRequest)new 
FriendsIdsRequest().withId(id).withScreenName(name);
       return new TwitterFriendsIdsProviderTask(
-              this,
-              client,
-              request);
+        this,
+        client,
+        request);
     } else if( config.getEndpoint().equals("friends") && config.getIdsOnly() 
== false ) {
       FriendsListRequest request = (FriendsListRequest)new 
FriendsListRequest().withId(id).withScreenName(name);
       return new TwitterFriendsListProviderTask(
-          this,
-          client,
-          request);
+        this,
+        client,
+        request);
     } else if( config.getEndpoint().equals("followers") && config.getIdsOnly() 
== true ) {
       FollowersIdsRequest request = (FollowersIdsRequest)new 
FollowersIdsRequest().withId(id).withScreenName(name);
       return new TwitterFollowersIdsProviderTask(
-          this,
-          client,
-          request);
+        this,
+        client,
+        request);
     } else if( config.getEndpoint().equals("followers") && config.getIdsOnly() 
== false ) {
       FollowersListRequest request = (FollowersListRequest)new 
FollowersListRequest().withId(id).withScreenName(name);
       return new TwitterFollowersListProviderTask(
-          this,
-          client,
-          request);
+        this,
+        client,
+        request);
     } else return null;
   }
 
-  protected void submitTask(Runnable providerTask) {
-    ListenableFuture future = executor.submit(providerTask);
-    futures.add(future);
-  }
-
   protected Twitter getTwitterClient() throws InstantiationException {
     return Twitter.getInstance(config);
   }
@@ -306,8 +315,7 @@ public class TwitterFollowingProvider implements 
StreamsProvider, Serializable {
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
-      result.setCounter(new DatumStatusCounter());
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
       LOGGER.debug("readCurrent: {} Documents", result.size());
     } finally {
       lock.writeLock().unlock();
@@ -332,40 +340,29 @@ public class TwitterFollowingProvider implements 
StreamsProvider, Serializable {
   }
 
   public boolean isRunning() {
-    if ( providerQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone() ) {
-      LOGGER.info("All Threads Completed");
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    if ( tasks.size() > 0 && tasks.size() == futures.size() && 
executor.isShutdown() && executor.isTerminated() ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
+    LOGGER.debug("isRunning: {}", running.get());
     return running.get();
   }
 
-  // abstract this out
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<>();
+  public void cleanUp() {
+    // cleanUp
   }
 
-  // abstract this out
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
+  @Override
+  public Iterator<Follow> call() {
+    prepare(config);
+    startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> 
(Follow)x.getDocument()).iterator();
   }
 
-  public void cleanUp() {
-    shutdownAndAwaitTermination(executor);
-  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
index 5a53123..402e825 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
@@ -32,10 +32,15 @@ import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+
 /**
  *  Retrieve friend or follower connections for a single user id.
  */
-public class TwitterFriendsIdsProviderTask implements Runnable {
+public class TwitterFriendsIdsProviderTask implements 
Callable<Iterator<FriendsIdsResponse>>, Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterFriendsIdsProviderTask.class);
 
@@ -44,6 +49,7 @@ public class TwitterFriendsIdsProviderTask implements 
Runnable {
   protected Twitter client;
   protected TwitterFollowingProvider provider;
   protected FriendsIdsRequest request;
+  protected List<FriendsIdsResponse> responseList;
 
   /**
    * TwitterFollowingProviderTask constructor.
@@ -62,6 +68,8 @@ public class TwitterFriendsIdsProviderTask implements 
Runnable {
 
     Preconditions.checkArgument(request.getId() != null || 
request.getScreenName() != null);
 
+    responseList = new ArrayList<>();
+
     LOGGER.info("Thread Starting: {}", request.toString());
 
     getFriendsIds(request);
@@ -81,6 +89,8 @@ public class TwitterFriendsIdsProviderTask implements 
Runnable {
 
       FriendsIdsResponse response = client.ids(request);
 
+      responseList.add(response);
+
       last_count = response.getIds().size();
 
       if (response.getIds().size() > 0) {
@@ -123,4 +133,9 @@ public class TwitterFriendsIdsProviderTask implements 
Runnable {
             && page_count <= provider.getConfig().getMaxPages());
   }
 
+  @Override
+  public Iterator<FriendsIdsResponse> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
index e116f2f..b2917e2 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
@@ -20,6 +20,8 @@ package org.apache.streams.twitter.provider;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.FollowersListResponse;
+import org.apache.streams.twitter.api.FriendsIdsResponse;
 import org.apache.streams.twitter.api.FriendsListRequest;
 import org.apache.streams.twitter.api.FriendsListResponse;
 import org.apache.streams.twitter.api.Twitter;
@@ -32,10 +34,15 @@ import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+
 /**
  *  Retrieve friend or follower connections for a single user id.
  */
-public class TwitterFriendsListProviderTask implements Runnable {
+public class TwitterFriendsListProviderTask implements 
Callable<Iterator<FriendsListResponse>>, Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterFriendsListProviderTask.class);
 
@@ -44,6 +51,7 @@ public class TwitterFriendsListProviderTask implements 
Runnable {
   protected Twitter client;
   protected TwitterFollowingProvider provider;
   protected FriendsListRequest request;
+  protected List<FriendsListResponse> responseList;
 
   /**
    * TwitterFollowingProviderTask constructor.
@@ -67,9 +75,13 @@ public class TwitterFriendsListProviderTask implements 
Runnable {
 
     Preconditions.checkArgument(request.getId() != null || 
request.getScreenName() != null);
 
+    responseList = new ArrayList<>();
+
+    LOGGER.info("Thread Starting: {}", request.toString());
+
     getFriendsList(request);
 
-    LOGGER.info(request.getId() != null ? request.getId().toString() : 
request.getScreenName() + " Thread Finished");
+    LOGGER.info("Thread Finished: {}", request.toString());
 
   }
 
@@ -79,6 +91,8 @@ public class TwitterFriendsListProviderTask implements 
Runnable {
 
       FriendsListResponse response = client.list(request);
 
+      responseList.add(response);
+
       last_count = response.getUsers().size();
 
       if (response.getUsers().size() > 0) {
@@ -121,4 +135,9 @@ public class TwitterFriendsListProviderTask implements 
Runnable {
             && page_count <= provider.getConfig().getMaxPages());
   }
 
+  @Override
+  public Iterator<FriendsListResponse> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java
index acbe008..b1b39b1 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java
@@ -32,14 +32,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
  *  Retrieve recent posts for a single user id.
  */
-public class TwitterRetweetsTask implements Runnable {
+public class TwitterRetweetsTask implements Callable<Iterator<Tweet>>, 
Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterRetweetsTask.class);
 
@@ -76,4 +78,9 @@ public class TwitterRetweetsTask implements Runnable {
 
   }
 
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    run();
+    return provider.providerQueue.stream().map(x -> 
(Tweet)x.getDocument()).iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index d9a0759..5512d35 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -25,11 +25,14 @@ import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.config.TwitterTimelineProviderConfiguration;
 import org.apache.streams.twitter.api.StatusesUserTimelineRequest;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -54,10 +57,13 @@ import java.io.PrintStream;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -71,7 +77,7 @@ import static 
java.util.concurrent.Executors.newSingleThreadExecutor;
 /**
  * Retrieve recent posts from a list of user ids or names.
  */
-public class TwitterTimelineProvider implements StreamsProvider, Serializable {
+public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, 
StreamsProvider, Serializable {
 
   private static final String STREAMS_ID = "TwitterTimelineProvider";
 
@@ -95,14 +101,17 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
   protected int idsCount;
   protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  protected ExecutorService executor;
 
   protected DateTime start;
   protected DateTime end;
 
+  StreamsConfiguration streamsConfiguration;
+
   protected final AtomicBoolean running = new AtomicBoolean();
 
-  protected List<ListenableFuture<Object>> futures = new ArrayList<>();
+  private List<Callable<Object>> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
 
   /**
    * To use from command line:
@@ -193,9 +202,11 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
       this.config = (TwitterTimelineProviderConfiguration)configurationObject;
     }
 
+    streamsConfiguration = 
StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig());
+
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -233,7 +244,7 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
     }
 
     executor = MoreExecutors.listeningDecorator(
-        TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -241,16 +252,25 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
 
     submitTimelineThreads(ids, names);
 
+    LOGGER.info("tasks: {}", tasks.size());
+    LOGGER.info("futures: {}", futures.size());
+
   }
 
   @Override
   public void startStream() {
 
-    LOGGER.debug("{} startStream", STREAMS_ID);
+    Objects.requireNonNull(executor);
+
+    LOGGER.info("startStream");
 
     running.set(true);
 
-    executor.shutdown();
+    LOGGER.info("running: {}", running.get());
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+
+    LOGGER.info("running: {}", running.get());
 
   }
 
@@ -260,27 +280,31 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
       StatusesUserTimelineRequest request = new StatusesUserTimelineRequest();
       request.setUserId(id);
       request.setCount(config.getPageSize());
-      TwitterTimelineProviderTask providerTask = new 
TwitterTimelineProviderTask(
+      Callable providerTask = new TwitterTimelineProviderTask(
           this,
           client,
           request
       );
-      ListenableFuture future = executor.submit(providerTask);
+      LOGGER.info("Thread Created: {}", request);
+      tasks.add(providerTask);
+      Future future = executor.submit(providerTask);
       futures.add(future);
-      LOGGER.info("Thread Submitted: {}", providerTask.request);
+      LOGGER.info("Thread Submitted: {}", request);
     }
     for (String name : names) {
       StatusesUserTimelineRequest request = new StatusesUserTimelineRequest();
       request.setScreenName(name);
       request.setCount(config.getPageSize());
-      TwitterTimelineProviderTask providerTask = new 
TwitterTimelineProviderTask(
+      Callable providerTask = new TwitterTimelineProviderTask(
           this,
           client,
           request
       );
-      ListenableFuture future = executor.submit(providerTask);
+      LOGGER.info("Thread Created: {}", request);
+      tasks.add(providerTask);
+      Future future = executor.submit(providerTask);
       futures.add(future);
-      LOGGER.info("Thread Submitted: {}", providerTask.request);
+      LOGGER.info("Thread Submitted: {}", request);
     }
   }
 
@@ -294,8 +318,7 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
-      result.setCounter(new DatumStatusCounter());
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -312,10 +335,6 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
 
   }
 
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<StreamsDatum>();
-  }
-
   public StreamsResultSet readNew(BigInteger sequence) {
     LOGGER.debug("{} readNew", STREAMS_ID);
     throw new NotImplementedException();
@@ -338,35 +357,31 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
 
   @Override
   public void cleanUp() {
-    shutdownAndAwaitTermination(executor);
-  }
-
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
   }
 
   @Override
   public boolean isRunning() {
-    if (providerQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
+    LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty());
+    LOGGER.debug("providerQueue.size: {}", providerQueue.size());
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    if ( tasks.size() > 0 && tasks.size() == futures.size() && 
executor.isShutdown() && executor.isTerminated() ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
+    LOGGER.debug("isRunning: ", running.get());
     return running.get();
   }
+
+  @Override
+  public Iterator<Tweet> call() {
+    prepare(config);
+    startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> (Tweet)x.getDocument()).iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index f8816e0..6bc9822 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -30,14 +30,17 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
  *  Retrieve recent posts for a single user id.
  */
-public class TwitterTimelineProviderTask implements Runnable {
+public class TwitterTimelineProviderTask implements Callable<Iterator<Tweet>>, 
Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterTimelineProviderTask.class);
 
@@ -46,6 +49,7 @@ public class TwitterTimelineProviderTask implements Runnable {
   protected TwitterTimelineProvider provider;
   protected Twitter client;
   protected StatusesUserTimelineRequest request;
+  protected List<Tweet> responseList;
 
   /**
    * TwitterTimelineProviderTask constructor.
@@ -68,10 +72,14 @@ public class TwitterTimelineProviderTask implements 
Runnable {
 
     LOGGER.info("Thread Starting: {}", request.toString());
 
+    responseList = new ArrayList<>();
+
     do {
 
       List<Tweet> statuses = client.userTimeline(request);
 
+      responseList.addAll(statuses);
+
       last_count = statuses.size();
       if( statuses.size() > 0 ) {
 
@@ -106,5 +114,9 @@ public class TwitterTimelineProviderTask implements 
Runnable {
   }
 
 
-
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 90cd23d..5e9970a 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -25,12 +25,15 @@ import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.config.TwitterFollowingConfiguration;
 import org.apache.streams.twitter.config.TwitterUserInformationConfiguration;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.api.UsersLookupRequest;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.User;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -56,10 +59,12 @@ import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -73,7 +78,7 @@ import static 
java.util.concurrent.Executors.newSingleThreadExecutor;
 /**
  * Retrieve current profile status from a list of user ids or names.
  */
-public class TwitterUserInformationProvider implements StreamsProvider, 
Serializable {
+public class TwitterUserInformationProvider implements 
Callable<Iterator<User>>, StreamsProvider, Serializable {
 
   private static final String STREAMS_ID = "TwitterUserInformationProvider";
 
@@ -92,6 +97,8 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
 
   protected volatile Queue<StreamsDatum> providerQueue;
 
+  StreamsConfiguration streamsConfiguration;
+
   public TwitterUserInformationConfiguration getConfig() {
     return config;
   }
@@ -171,11 +178,7 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
   }
 
   // TODO: this should be abstracted out
-  public static ExecutorService newFixedThreadPoolWithQueueSize(int 
numThreads, int queueSize) {
-    return new ThreadPoolExecutor(numThreads, numThreads,
-        5000L, TimeUnit.MILLISECONDS,
-        new ArrayBlockingQueue<>(queueSize, true), new 
ThreadPoolExecutor.CallerRunsPolicy());
-  }
+
 
   /**
    * TwitterUserInformationProvider constructor.
@@ -214,7 +217,7 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
     Objects.requireNonNull(config.getInfo());
     Objects.requireNonNull(config.getThreadsPerProvider());
 
-    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration();
+    streamsConfiguration = StreamsConfigurator.detectConfiguration();
 
     Objects.requireNonNull(streamsConfiguration.getQueueSize());
 
@@ -228,7 +231,7 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
 
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -250,7 +253,7 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
     }
 
     executor = MoreExecutors.listeningDecorator(
-        TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+        ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -332,8 +335,7 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
-      result.setCounter(new DatumStatusCounter());
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
       LOGGER.debug("readCurrent: {} Documents", result.size());
     } finally {
       lock.writeLock().unlock();
@@ -343,10 +345,6 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
 
   }
 
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<>();
-  }
-
   public StreamsResultSet readNew(BigInteger sequence) {
     LOGGER.debug("{} readNew", STREAMS_ID);
     throw new NotImplementedException();
@@ -372,31 +370,23 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
     return running.get();
   }
 
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
-  }
-
   protected Twitter getTwitterClient() throws InstantiationException {
     return Twitter.getInstance(config);
   }
 
   @Override
   public void cleanUp() {
-    shutdownAndAwaitTermination(executor);
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+  }
+
+  @Override
+  public Iterator<User> call() throws Exception {
+    prepare(config);
+    startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> (User)x.getDocument()).iterator();
   }
 }

-- 
To stop receiving notification emails like this one, please contact
sblack...@apache.org.

Reply via email to