This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git


The following commit(s) were added to refs/heads/master by this push:
     new 948ceac  STREAMS-677,STREAMS-680 (#516)
948ceac is described below

commit 948ceac52c6e8fc1cfdf5578bcc66b3113e506c4
Author: Steve Blackmon <sblack...@apache.org>
AuthorDate: Thu Oct 22 14:46:58 2020 -0500

    STREAMS-677,STREAMS-680 (#516)
    
    * resolves STREAMS-677 and STREAMS-680
    
    resolves STREAMS-677
    resolves STREAMS-680
    
    * reorganize imports and fix a few compile problems
---
 .../org/apache/streams/twitter/api/Twitter.java    |  10 +-
 .../twitter/provider/SevenDaySearchProvider.java   |  51 +++++++---
 .../provider/SevenDaySearchProviderTask.java       |  43 ++++----
 .../twitter/provider/ThirtyDaySearchProvider.java  | 110 ++++++---------------
 .../provider/ThirtyDaySearchProviderTask.java      |  63 ++++++------
 .../provider/TwitterTimelineProviderTask.java      |  26 ++++-
 .../TwitterTimelineProviderConfiguration.json      |   9 ++
 7 files changed, 159 insertions(+), 153 deletions(-)

diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
index ebb9c64..cf3e963 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
@@ -58,6 +58,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * Implementation of all twitter interfaces using juneau.
@@ -110,8 +112,8 @@ public class Twitter implements
       .setDefaultRequestConfig(
         RequestConfig.custom()
           .setConnectionRequestTimeout(5000)
-          .setConnectTimeout(5000)
-          .setSocketTimeout(5000)
+          .setConnectTimeout(60000)
+          .setSocketTimeout(60000)
           .setCookieSpec("easy")
           .build()
       )
@@ -122,6 +124,7 @@ public class Twitter implements
       .addInterceptorLast((HttpResponseInterceptor) (httpResponse, 
httpContext) -> LOGGER.debug(httpResponse.getStatusLine().toString()))
       .build();
     this.restClientBuilder = RestClient.create()
+      .executorService(Executors.newCachedThreadPool(), false)
       .httpClient(httpclient, true)
       .parser(
         JsonParser.DEFAULT.builder()
@@ -138,7 +141,8 @@ public class Twitter implements
       .retryable(
         configuration.getRetryMax().intValue(),
         configuration.getRetrySleepMs().intValue(),
-        new TwitterRetryHandler());
+        new TwitterRetryHandler()
+      );
     if( configuration.getDebug() ) {
       restClientBuilder = restClientBuilder.debug();
     }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
index 2a59cba..86fd465 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
@@ -42,6 +42,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
 
+import org.apache.commons.collections.iterators.IteratorChain;
 import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -59,6 +60,8 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -97,8 +100,9 @@ public class SevenDaySearchProvider implements 
Callable<Iterator<Tweet>>, Stream
 
   StreamsConfiguration streamsConfiguration;
 
-  private List<Callable<Object>> tasks = new ArrayList<>();
-  private List<Future<Object>> futures = new ArrayList<>();
+  private List<Callable<Iterator<Tweet>>> tasks = new ArrayList<>();
+  private List<Future<Iterator<Tweet>>> futures = new ArrayList<>();
+  private CompletionService<Iterator<Tweet>> completionService;
 
   protected final AtomicBoolean running = new AtomicBoolean();
 
@@ -199,7 +203,15 @@ public class SevenDaySearchProvider implements 
Callable<Iterator<Tweet>>, Stream
 
     request = new SevenDaySearchRequest();
     request.setQ(config.getQ());
-
+    request.setGeocode(config.getGeocode());
+    if( !Objects.isNull(config.getIncludeEntities()) ) {
+      request.setIncludeEntities(config.getIncludeEntities().toString());
+    }
+    request.setLang(config.getLang());
+    request.setLocale(config.getLocale());
+    if( !Objects.isNull(config.getResultType())) {
+      request.setResultType(config.getResultType());
+    }
     streamsConfiguration = StreamsConfigurator.detectConfiguration();
 
     try {
@@ -217,6 +229,8 @@ public class SevenDaySearchProvider implements 
Callable<Iterator<Tweet>>, Stream
         )
     );
 
+    completionService = new ExecutorCompletionService<>(executor);
+
     submitSearchThread();
 
   }
@@ -240,16 +254,16 @@ public class SevenDaySearchProvider implements 
Callable<Iterator<Tweet>>, Stream
 
   protected void submitSearchThread() {
 
-      Callable providerTask = new SevenDaySearchProviderTask(
-          this,
-          client,
-        request
-      );
-      LOGGER.info("Thread Created: {}", request);
-      tasks.add(providerTask);
-      Future future = executor.submit(providerTask);
-      futures.add(future);
-      LOGGER.info("Thread Submitted: {}", request);
+    Callable providerTask = new SevenDaySearchProviderTask(
+      this,
+      client,
+      request
+    );
+    LOGGER.info("Thread Created: {}", request);
+    tasks.add(providerTask);
+    Future<Iterator<Tweet>> future = completionService.submit(providerTask);
+    futures.add(future);
+    LOGGER.info("Thread Submitted: {}", request);
 
   }
 
@@ -324,8 +338,15 @@ public class SevenDaySearchProvider implements 
Callable<Iterator<Tweet>>, Stream
     do {
       
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
     } while ( isRunning());
+    IteratorChain chain = new IteratorChain();
+    int received = 0;
+    while(received < tasks.size()) {
+      Future<Iterator<Tweet>> resultFuture = completionService.take();
+      Iterator<Tweet> result = resultFuture.get();
+      chain.addIterator(result);
+      received ++;
+    }
     cleanUp();
-    return providerQueue.stream().map( x -> 
((Tweet)x.getDocument())).distinct().iterator();
-
+    return chain;
   }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
index d5b6d8d..719b734 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
@@ -18,14 +18,12 @@
 
 package org.apache.streams.twitter.provider;
 
-import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.api.SevenDaySearchRequest;
 import org.apache.streams.twitter.api.SevenDaySearchResponse;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
 import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.util.ComponentUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -40,7 +38,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
- *  Retrieve recent posts for a single user id.
+ *  Retrieve recent posts from standard seven day search.
  */
 public class SevenDaySearchProviderTask implements Callable<Iterator<Tweet>>, 
Runnable {
 
@@ -51,7 +49,7 @@ public class SevenDaySearchProviderTask implements 
Callable<Iterator<Tweet>>, Ru
   protected SevenDaySearchProvider provider;
   protected Twitter client;
   protected SevenDaySearchRequest request;
-  protected List<Tweet> responseList;
+  protected List<Tweet> responseList = new ArrayList<>();
 
   /**
    * SevenDaySearchProviderTask constructor.
@@ -63,15 +61,15 @@ public class SevenDaySearchProviderTask implements 
Callable<Iterator<Tweet>>, Ru
     this.provider = provider;
     this.client = twitter;
     this.request = request;
-    this.responseList = new ArrayList<>();
   }
 
   int item_count = 0;
   int last_count = 0;
   int page_count = 0;
+  Long maxId = null;
 
   @Override
-  public void run() {
+  public Iterator<Tweet> call() throws Exception {
 
     LOGGER.info("Thread Starting: {}", request.toString());
 
@@ -81,48 +79,55 @@ public class SevenDaySearchProviderTask implements 
Callable<Iterator<Tweet>>, Ru
 
       List<Tweet> statuses = response.getStatuses();
 
+      last_count = statuses.size();
+
+      page_count++;
+
       responseList.addAll(statuses);
 
-      last_count = statuses.size();
       if( statuses.size() > 0 ) {
 
         for (Tweet status : statuses) {
 
           if (item_count < provider.getConfig().getMaxItems()) {
-            ComponentUtils.offerUntilSuccess(new StreamsDatum(status), 
provider.providerQueue);
+            responseList.add(status);
             item_count++;
           }
 
         }
 
         Stream<Long> statusIds = statuses.stream().map(status -> 
status.getId());
-        long minId = statusIds.reduce(Math::min).get();
-        page_count++;
-        request.setMaxId(new Long(minId).toString());
+        maxId = statusIds.reduce(Math::min).get();
+        request.setMaxId(maxId.toString());
 
       }
 
+      LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, 
last_count, page_count);
+
     }
     while (shouldContinuePulling(last_count, page_count, item_count));
 
-    LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, 
last_count, page_count);
-    
+    return responseList.iterator();
   }
 
-  public boolean shouldContinuePulling(int count, int page_count, int 
item_count) {
+  public boolean shouldContinuePulling(int item_count, int last_count, int 
page_count) {
+    boolean shouldContinuePulling = last_count > 0;
     if ( item_count >= provider.getConfig().getMaxItems() ) {
       return false;
     } else if (page_count >= provider.getConfig().getMaxPages()) {
       return false;
-    } else {
-      return ( count > 0 );
     }
+    LOGGER.info("shouldContinuePulling: ", shouldContinuePulling);
+    return shouldContinuePulling;
   }
 
   @Override
-  public Iterator<Tweet> call() throws Exception {
-    run();
-    return responseList.iterator();
+  public void run() {
+    try {
+      this.call();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
   }
 
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
index 5e0a8ec..6340c55 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
@@ -21,11 +21,7 @@ package org.apache.streams.twitter.provider;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.core.util.ExecutorUtils;
-import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.StreamsJacksonMapperConfiguration;
 import org.apache.streams.twitter.api.ThirtyDaySearchRequest;
@@ -42,8 +38,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
 
-import org.apache.commons.lang.NotImplementedException;
-import org.joda.time.DateTime;
+import org.apache.commons.collections.iterators.IteratorChain;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,13 +47,14 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.io.Serializable;
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
-import java.util.Queue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -71,7 +67,7 @@ import java.util.stream.Stream;
 /**
  * Retrieve recent posts from a list of user ids or names.
  */
-public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, 
StreamsProvider, Serializable {
+public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, 
Serializable {
 
   private static final String STREAMS_ID = "ThirtyDaySearchProvider";
 
@@ -87,16 +83,15 @@ public class ThirtyDaySearchProvider implements 
Callable<Iterator<Tweet>>, Strea
     return config;
   }
 
-  protected volatile Queue<StreamsDatum> providerQueue;
-
   protected ThirtyDaySearchRequest request;
 
   protected Twitter client;
 
   protected ExecutorService executor;
 
-  private List<Callable<Object>> tasks = new ArrayList<>();
-  private List<Future<Object>> futures = new ArrayList<>();
+  private List<Callable<Iterator<Tweet>>> tasks = new ArrayList<>();
+  private List<Future<Iterator<Tweet>>> futures = new ArrayList<>();
+  private CompletionService<Iterator<Tweet>> completionService;
 
   StreamsConfiguration streamsConfiguration;
 
@@ -131,14 +126,12 @@ public class ThirtyDaySearchProvider implements 
Callable<Iterator<Tweet>>, Strea
     String configfile = args[0];
     String outfile = args[1];
 
-    Config reference = ConfigFactory.load();
     File file = new File(configfile);
     assert (file.exists());
     Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+    StreamsConfigurator.addConfig(testResourceConfig);
 
-    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-
-    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
+    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration();
     ThirtyDaySearchProviderConfiguration config = new 
ComponentConfigurator<>(ThirtyDaySearchProviderConfiguration.class).detectConfiguration();
     ThirtyDaySearchProvider provider = new ThirtyDaySearchProvider(config);
 
@@ -163,30 +156,16 @@ public class ThirtyDaySearchProvider implements 
Callable<Iterator<Tweet>>, Strea
     this.config = config;
   }
 
-  public Queue<StreamsDatum> getProviderQueue() {
-    return this.providerQueue;
-  }
-
-  @Override
   public String getId() {
     return STREAMS_ID;
   }
 
-  @Override
   public void prepare(Object configurationObject) {
 
     if( !(configurationObject instanceof ThirtyDaySearchProviderConfiguration 
) ) {
       this.config = (ThirtyDaySearchProviderConfiguration)configurationObject;
     }
 
-    try {
-      lock.writeLock().lock();
-      providerQueue = QueueUtils.constructQueue();
-    } finally {
-      lock.writeLock().unlock();
-    }
-
-    Objects.requireNonNull(providerQueue);
     Objects.requireNonNull(config.getOauth().getConsumerKey());
     Objects.requireNonNull(config.getOauth().getConsumerSecret());
     Objects.requireNonNull(config.getOauth().getAccessToken());
@@ -196,7 +175,8 @@ public class ThirtyDaySearchProvider implements 
Callable<Iterator<Tweet>>, Strea
 
     request = new ThirtyDaySearchRequest();
     request.setQuery(config.getQuery());
-
+    request.setTag(config.getTag());
+    request.setMaxResults(config.getPageSize());
     streamsConfiguration = StreamsConfigurator.detectConfiguration();
 
     try {
@@ -214,11 +194,12 @@ public class ThirtyDaySearchProvider implements 
Callable<Iterator<Tweet>>, Strea
         )
     );
 
+    completionService = new ExecutorCompletionService<>(executor);
+
     submitSearchThread();
 
   }
 
-  @Override
   public void startStream() {
 
     Objects.requireNonNull(executor);
@@ -238,54 +219,17 @@ public class ThirtyDaySearchProvider implements 
Callable<Iterator<Tweet>>, Strea
   protected void submitSearchThread() {
 
     Callable providerTask = new ThirtyDaySearchProviderTask(
-          this,
-          client,
-        request
-      );
+      this,
+      client,
+      request
+    );
     LOGGER.info("Thread Created: {}", request);
     tasks.add(providerTask);
-    Future future = executor.submit(providerTask);
+    Future<Iterator<Tweet>> future = completionService.submit(providerTask);
     futures.add(future);
     LOGGER.info("Thread Submitted: {}", request);
   }
 
-  @Override
-  public StreamsResultSet readCurrent() {
-
-    StreamsResultSet result;
-
-    LOGGER.debug("Providing {} docs", providerQueue.size());
-
-    try {
-      lock.writeLock().lock();
-      result = new StreamsResultSet(providerQueue);
-      providerQueue = QueueUtils.constructQueue();
-    } finally {
-      lock.writeLock().unlock();
-    }
-
-    if ( result.size() == 0 && providerQueue.isEmpty() && 
executor.isTerminated() ) {
-      LOGGER.info("Finished.  Cleaning up...");
-
-      running.set(false);
-
-      LOGGER.info("Exiting");
-    }
-
-    return result;
-
-  }
-
-  public StreamsResultSet readNew(BigInteger sequence) {
-    LOGGER.debug("{} readNew", STREAMS_ID);
-    throw new NotImplementedException();
-  }
-
-  public StreamsResultSet readRange(DateTime start, DateTime end) {
-    LOGGER.debug("{} readRange", STREAMS_ID);
-    throw new NotImplementedException();
-  }
-
   /**
    * get Twitter Client from TwitterUserInformationConfiguration.
    * @return result
@@ -296,12 +240,10 @@ public class ThirtyDaySearchProvider implements 
Callable<Iterator<Tweet>>, Strea
 
   }
 
-  @Override
   public void cleanUp() {
     ExecutorUtils.shutdownAndAwaitTermination(executor);
   }
 
-  @Override
   public boolean isRunning() {
     LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
     LOGGER.debug("tasks.size(): {}", tasks.size());
@@ -314,13 +256,21 @@ public class ThirtyDaySearchProvider implements 
Callable<Iterator<Tweet>>, Strea
   }
 
   @Override
-  public Iterator<Tweet> call() throws Exception {
+  public Iterator<Tweet> call() throws InterruptedException, 
ExecutionException {
     prepare(config);
     startStream();
     do {
       
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-    } while ( isRunning());
+    } while (isRunning());
+    IteratorChain chain = new IteratorChain();
+    int received = 0;
+    while (received < tasks.size()) {
+      Future<Iterator<Tweet>> resultFuture = completionService.take();
+      Iterator<Tweet> result = resultFuture.get();
+      chain.addIterator(result);
+      received++;
+    }
     cleanUp();
-    return providerQueue.stream().map( x -> 
((Tweet)x.getDocument())).iterator();
+    return chain;
   }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
index 92f1c64..7ef0493 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
@@ -18,6 +18,7 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.google.common.base.Strings;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.api.ThirtyDaySearchRequest;
@@ -32,15 +33,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 /**
- *  Retrieve recent posts for a single user id.
+ *  Retrieve recent posts from premium thirty day search.
  */
 public class ThirtyDaySearchProviderTask implements Callable<Iterator<Tweet>>, 
Runnable {
 
@@ -51,7 +51,7 @@ public class ThirtyDaySearchProviderTask implements 
Callable<Iterator<Tweet>>, R
   protected ThirtyDaySearchProvider provider;
   protected Twitter client;
   protected ThirtyDaySearchRequest request;
-  protected List<Tweet> responseList;
+  protected List<Tweet> responseList = new ArrayList<>();
 
   /**
    * ThirtyDaySearchProviderTask constructor.
@@ -63,15 +63,15 @@ public class ThirtyDaySearchProviderTask implements 
Callable<Iterator<Tweet>>, R
     this.provider = provider;
     this.client = twitter;
     this.request = request;
-    this.responseList = new ArrayList<>();
   }
 
   int item_count = 0;
   int last_count = 0;
   int page_count = 0;
+  String next = null;
 
   @Override
-  public void run() {
+  public Iterator<Tweet> call() throws Exception {
 
     LOGGER.info("Thread Starting: {}", request.toString());
 
@@ -81,46 +81,47 @@ public class ThirtyDaySearchProviderTask implements 
Callable<Iterator<Tweet>>, R
 
       List<Tweet> statuses = response.getResults();
 
-      responseList.addAll(statuses);
-
       last_count = statuses.size();
-      if( statuses.size() > 0 ) {
 
-        for (Tweet status : statuses) {
+      // count items but dont truncate response b/c we already paid for them
+      item_count += statuses.size();
 
-          if (item_count < provider.getConfig().getMaxItems()) {
-            ComponentUtils.offerUntilSuccess(new StreamsDatum(status), 
provider.providerQueue);
-            item_count++;
-          }
+      page_count++;
 
-        }
+      responseList.addAll(statuses);
 
-        Stream<Long> statusIds = statuses.stream().map(status -> 
status.getId());
-        page_count++;
-        request.setNext(response.getNext());
+      next = response.getNext();
 
-      }
+      request.setNext(next);
 
-    }
-    while (shouldContinuePulling(last_count, page_count, item_count));
+      LOGGER.info("item_count: {} last_count: {} page_count: {} next: {} ", 
item_count, last_count, page_count, next);
 
-    LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, 
last_count, page_count);
+    }
+    while (shouldContinuePulling(last_count, page_count, item_count, next));
 
+    return responseList.iterator();
   }
 
-  public boolean shouldContinuePulling(int count, int page_count, int 
item_count) {
+  public boolean shouldContinuePulling(int count, int page_count, int 
item_count, String next) {
+    boolean shouldContinuePulling = count > 0;
+    if (Strings.isNullOrEmpty(next)) {
+      shouldContinuePulling = false;
+    }
     if (item_count >= provider.getConfig().getMaxItems()) {
-      return false;
+      shouldContinuePulling = false;
     } else if (page_count >= provider.getConfig().getMaxPages()) {
-      return false;
-    } else {
-      return (count > 0);
+      shouldContinuePulling = false;
     }
+    LOGGER.info("shouldContinuePulling: ", shouldContinuePulling);
+    return shouldContinuePulling;
   }
 
   @Override
-  public Iterator<Tweet> call() throws Exception {
-    run();
-    return responseList.iterator();
+  public void run() {
+    try {
+      this.call();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
   }
 }
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index 3afb785..d3ff1b9 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -31,7 +31,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -67,6 +69,7 @@ public class TwitterTimelineProviderTask implements 
Callable<Iterator<Tweet>>, R
   int item_count = 0;
   int last_count = 0;
   int page_count = 0;
+  Date earliest_timestamp = new Date(Long.MAX_VALUE);
 
   @Override
   public void run() {
@@ -86,9 +89,19 @@ public class TwitterTimelineProviderTask implements 
Callable<Iterator<Tweet>>, R
 
         for (Tweet status : statuses) {
 
+          earliest_timestamp = 
Date.from(Instant.ofEpochMilli(Math.min(earliest_timestamp.getTime(), 
status.getCreatedAt().getTime())));
+
           if (item_count < provider.getConfig().getMaxItems()) {
-            ComponentUtils.offerUntilSuccess(new StreamsDatum(status), 
provider.providerQueue);
-            item_count++;
+
+            if( (provider.getConfig().getMinTimestamp() != null &&
+                  
earliest_timestamp.after(provider.getConfig().getMinTimestamp())) ||
+                provider.getConfig().getMinTimestamp() == null ) {
+
+              ComponentUtils.offerUntilSuccess(new StreamsDatum(status), 
provider.providerQueue);
+              item_count++;
+
+            }
+
           }
 
         }
@@ -101,14 +114,17 @@ public class TwitterTimelineProviderTask implements 
Callable<Iterator<Tweet>>, R
       }
 
     }
-    while (shouldContinuePulling(last_count, page_count, item_count));
+    while (shouldContinuePulling(last_count, page_count, item_count, 
earliest_timestamp));
 
     LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, 
last_count, page_count);
     
   }
 
-  public boolean shouldContinuePulling(int count, int page_count, int 
item_count) {
-    if (item_count == provider.getConfig().getMaxItems()) {
+  public boolean shouldContinuePulling(int count, int page_count, int 
item_count, Date earliest_timestamp) {
+    if (provider.getConfig().getMinTimestamp() != null &&
+          earliest_timestamp.before(provider.getConfig().getMinTimestamp())) {
+      return false;
+    } else if (item_count == provider.getConfig().getMaxItems()) {
       return false;
     } else if (page_count == provider.getConfig().getMaxPages()) {
       return false;
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json
 
b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json
index 41f2dd3..3bca4e4 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json
+++ 
b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json
@@ -16,6 +16,15 @@
         "max_pages": {
             "type": "integer",
             "description": "Max items per page to request"
+        },
+        "min_timestamp": {
+            "type": "string",
+            "format": "date-time",
+            "description": "Earliest timestamp permitted"
+        },
+        "page_size": {
+            "type": "integer",
+            "description": "Max items per page to request"
         }
     }
 }
\ No newline at end of file

Reply via email to