This is an automated email from the ASF dual-hosted git repository. sblackmon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/streams.git
The following commit(s) were added to refs/heads/master by this push: new 948ceac STREAMS-677,STREAMS-680 (#516) 948ceac is described below commit 948ceac52c6e8fc1cfdf5578bcc66b3113e506c4 Author: Steve Blackmon <sblack...@apache.org> AuthorDate: Thu Oct 22 14:46:58 2020 -0500 STREAMS-677,STREAMS-680 (#516) * resolves STREAMS-677 and STREAMS-680 resolves STREAMS-677 resolves STREAMS-680 * reorganize imports and fix a few compile problems --- .../org/apache/streams/twitter/api/Twitter.java | 10 +- .../twitter/provider/SevenDaySearchProvider.java | 51 +++++++--- .../provider/SevenDaySearchProviderTask.java | 43 ++++---- .../twitter/provider/ThirtyDaySearchProvider.java | 110 ++++++--------------- .../provider/ThirtyDaySearchProviderTask.java | 63 ++++++------ .../provider/TwitterTimelineProviderTask.java | 26 ++++- .../TwitterTimelineProviderConfiguration.json | 9 ++ 7 files changed, 159 insertions(+), 153 deletions(-) diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java index ebb9c64..cf3e963 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java @@ -58,6 +58,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Implementation of all twitter interfaces using juneau. @@ -110,8 +112,8 @@ public class Twitter implements .setDefaultRequestConfig( RequestConfig.custom() .setConnectionRequestTimeout(5000) - .setConnectTimeout(5000) - .setSocketTimeout(5000) + .setConnectTimeout(60000) + .setSocketTimeout(60000) .setCookieSpec("easy") .build() ) @@ -122,6 +124,7 @@ public class Twitter implements .addInterceptorLast((HttpResponseInterceptor) (httpResponse, httpContext) -> LOGGER.debug(httpResponse.getStatusLine().toString())) .build(); this.restClientBuilder = RestClient.create() + .executorService(Executors.newCachedThreadPool(), false) .httpClient(httpclient, true) .parser( JsonParser.DEFAULT.builder() @@ -138,7 +141,8 @@ public class Twitter implements .retryable( configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), - new TwitterRetryHandler()); + new TwitterRetryHandler() + ); if( configuration.getDebug() ) { restClientBuilder = restClientBuilder.debug(); } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java index 2a59cba..86fd465 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java @@ -42,6 +42,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; +import org.apache.commons.collections.iterators.IteratorChain; import org.apache.commons.lang.NotImplementedException; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -59,6 +60,8 @@ import java.util.List; import java.util.Objects; import java.util.Queue; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -97,8 +100,9 @@ public class SevenDaySearchProvider implements Callable<Iterator<Tweet>>, Stream StreamsConfiguration streamsConfiguration; - private List<Callable<Object>> tasks = new ArrayList<>(); - private List<Future<Object>> futures = new ArrayList<>(); + private List<Callable<Iterator<Tweet>>> tasks = new ArrayList<>(); + private List<Future<Iterator<Tweet>>> futures = new ArrayList<>(); + private CompletionService<Iterator<Tweet>> completionService; protected final AtomicBoolean running = new AtomicBoolean(); @@ -199,7 +203,15 @@ public class SevenDaySearchProvider implements Callable<Iterator<Tweet>>, Stream request = new SevenDaySearchRequest(); request.setQ(config.getQ()); - + request.setGeocode(config.getGeocode()); + if( !Objects.isNull(config.getIncludeEntities()) ) { + request.setIncludeEntities(config.getIncludeEntities().toString()); + } + request.setLang(config.getLang()); + request.setLocale(config.getLocale()); + if( !Objects.isNull(config.getResultType())) { + request.setResultType(config.getResultType()); + } streamsConfiguration = StreamsConfigurator.detectConfiguration(); try { @@ -217,6 +229,8 @@ public class SevenDaySearchProvider implements Callable<Iterator<Tweet>>, Stream ) ); + completionService = new ExecutorCompletionService<>(executor); + submitSearchThread(); } @@ -240,16 +254,16 @@ public class SevenDaySearchProvider implements Callable<Iterator<Tweet>>, Stream protected void submitSearchThread() { - Callable providerTask = new SevenDaySearchProviderTask( - this, - client, - request - ); - LOGGER.info("Thread Created: {}", request); - tasks.add(providerTask); - Future future = executor.submit(providerTask); - futures.add(future); - LOGGER.info("Thread Submitted: {}", request); + Callable providerTask = new SevenDaySearchProviderTask( + this, + client, + request + ); + LOGGER.info("Thread Created: {}", request); + tasks.add(providerTask); + Future<Iterator<Tweet>> future = completionService.submit(providerTask); + futures.add(future); + LOGGER.info("Thread Submitted: {}", request); } @@ -324,8 +338,15 @@ public class SevenDaySearchProvider implements Callable<Iterator<Tweet>>, Stream do { Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); } while ( isRunning()); + IteratorChain chain = new IteratorChain(); + int received = 0; + while(received < tasks.size()) { + Future<Iterator<Tweet>> resultFuture = completionService.take(); + Iterator<Tweet> result = resultFuture.get(); + chain.addIterator(result); + received ++; + } cleanUp(); - return providerQueue.stream().map( x -> ((Tweet)x.getDocument())).distinct().iterator(); - + return chain; } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java index d5b6d8d..719b734 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java @@ -18,14 +18,12 @@ package org.apache.streams.twitter.provider; -import org.apache.streams.core.StreamsDatum; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.api.SevenDaySearchRequest; import org.apache.streams.twitter.api.SevenDaySearchResponse; import org.apache.streams.twitter.api.Twitter; import org.apache.streams.twitter.converter.TwitterDateTimeFormat; import org.apache.streams.twitter.pojo.Tweet; -import org.apache.streams.util.ComponentUtils; import com.fasterxml.jackson.databind.ObjectMapper; @@ -40,7 +38,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; /** - * Retrieve recent posts for a single user id. + * Retrieve recent posts from standard seven day search. */ public class SevenDaySearchProviderTask implements Callable<Iterator<Tweet>>, Runnable { @@ -51,7 +49,7 @@ public class SevenDaySearchProviderTask implements Callable<Iterator<Tweet>>, Ru protected SevenDaySearchProvider provider; protected Twitter client; protected SevenDaySearchRequest request; - protected List<Tweet> responseList; + protected List<Tweet> responseList = new ArrayList<>(); /** * SevenDaySearchProviderTask constructor. @@ -63,15 +61,15 @@ public class SevenDaySearchProviderTask implements Callable<Iterator<Tweet>>, Ru this.provider = provider; this.client = twitter; this.request = request; - this.responseList = new ArrayList<>(); } int item_count = 0; int last_count = 0; int page_count = 0; + Long maxId = null; @Override - public void run() { + public Iterator<Tweet> call() throws Exception { LOGGER.info("Thread Starting: {}", request.toString()); @@ -81,48 +79,55 @@ public class SevenDaySearchProviderTask implements Callable<Iterator<Tweet>>, Ru List<Tweet> statuses = response.getStatuses(); + last_count = statuses.size(); + + page_count++; + responseList.addAll(statuses); - last_count = statuses.size(); if( statuses.size() > 0 ) { for (Tweet status : statuses) { if (item_count < provider.getConfig().getMaxItems()) { - ComponentUtils.offerUntilSuccess(new StreamsDatum(status), provider.providerQueue); + responseList.add(status); item_count++; } } Stream<Long> statusIds = statuses.stream().map(status -> status.getId()); - long minId = statusIds.reduce(Math::min).get(); - page_count++; - request.setMaxId(new Long(minId).toString()); + maxId = statusIds.reduce(Math::min).get(); + request.setMaxId(maxId.toString()); } + LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, last_count, page_count); + } while (shouldContinuePulling(last_count, page_count, item_count)); - LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, last_count, page_count); - + return responseList.iterator(); } - public boolean shouldContinuePulling(int count, int page_count, int item_count) { + public boolean shouldContinuePulling(int item_count, int last_count, int page_count) { + boolean shouldContinuePulling = last_count > 0; if ( item_count >= provider.getConfig().getMaxItems() ) { return false; } else if (page_count >= provider.getConfig().getMaxPages()) { return false; - } else { - return ( count > 0 ); } + LOGGER.info("shouldContinuePulling: ", shouldContinuePulling); + return shouldContinuePulling; } @Override - public Iterator<Tweet> call() throws Exception { - run(); - return responseList.iterator(); + public void run() { + try { + this.call(); + } catch (Exception e) { + e.printStackTrace(); + } } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java index 5e0a8ec..6340c55 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java @@ -21,11 +21,7 @@ package org.apache.streams.twitter.provider; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; import org.apache.streams.core.util.ExecutorUtils; -import org.apache.streams.core.util.QueueUtils; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.StreamsJacksonMapperConfiguration; import org.apache.streams.twitter.api.ThirtyDaySearchRequest; @@ -42,8 +38,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; -import org.apache.commons.lang.NotImplementedException; -import org.joda.time.DateTime; +import org.apache.commons.collections.iterators.IteratorChain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,13 +47,14 @@ import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; import java.io.Serializable; -import java.math.BigInteger; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Objects; -import java.util.Queue; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -71,7 +67,7 @@ import java.util.stream.Stream; /** * Retrieve recent posts from a list of user ids or names. */ -public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, StreamsProvider, Serializable { +public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, Serializable { private static final String STREAMS_ID = "ThirtyDaySearchProvider"; @@ -87,16 +83,15 @@ public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, Strea return config; } - protected volatile Queue<StreamsDatum> providerQueue; - protected ThirtyDaySearchRequest request; protected Twitter client; protected ExecutorService executor; - private List<Callable<Object>> tasks = new ArrayList<>(); - private List<Future<Object>> futures = new ArrayList<>(); + private List<Callable<Iterator<Tweet>>> tasks = new ArrayList<>(); + private List<Future<Iterator<Tweet>>> futures = new ArrayList<>(); + private CompletionService<Iterator<Tweet>> completionService; StreamsConfiguration streamsConfiguration; @@ -131,14 +126,12 @@ public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, Strea String configfile = args[0]; String outfile = args[1]; - Config reference = ConfigFactory.load(); File file = new File(configfile); assert (file.exists()); Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); + StreamsConfigurator.addConfig(testResourceConfig); - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(); ThirtyDaySearchProviderConfiguration config = new ComponentConfigurator<>(ThirtyDaySearchProviderConfiguration.class).detectConfiguration(); ThirtyDaySearchProvider provider = new ThirtyDaySearchProvider(config); @@ -163,30 +156,16 @@ public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, Strea this.config = config; } - public Queue<StreamsDatum> getProviderQueue() { - return this.providerQueue; - } - - @Override public String getId() { return STREAMS_ID; } - @Override public void prepare(Object configurationObject) { if( !(configurationObject instanceof ThirtyDaySearchProviderConfiguration ) ) { this.config = (ThirtyDaySearchProviderConfiguration)configurationObject; } - try { - lock.writeLock().lock(); - providerQueue = QueueUtils.constructQueue(); - } finally { - lock.writeLock().unlock(); - } - - Objects.requireNonNull(providerQueue); Objects.requireNonNull(config.getOauth().getConsumerKey()); Objects.requireNonNull(config.getOauth().getConsumerSecret()); Objects.requireNonNull(config.getOauth().getAccessToken()); @@ -196,7 +175,8 @@ public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, Strea request = new ThirtyDaySearchRequest(); request.setQuery(config.getQuery()); - + request.setTag(config.getTag()); + request.setMaxResults(config.getPageSize()); streamsConfiguration = StreamsConfigurator.detectConfiguration(); try { @@ -214,11 +194,12 @@ public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, Strea ) ); + completionService = new ExecutorCompletionService<>(executor); + submitSearchThread(); } - @Override public void startStream() { Objects.requireNonNull(executor); @@ -238,54 +219,17 @@ public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, Strea protected void submitSearchThread() { Callable providerTask = new ThirtyDaySearchProviderTask( - this, - client, - request - ); + this, + client, + request + ); LOGGER.info("Thread Created: {}", request); tasks.add(providerTask); - Future future = executor.submit(providerTask); + Future<Iterator<Tweet>> future = completionService.submit(providerTask); futures.add(future); LOGGER.info("Thread Submitted: {}", request); } - @Override - public StreamsResultSet readCurrent() { - - StreamsResultSet result; - - LOGGER.debug("Providing {} docs", providerQueue.size()); - - try { - lock.writeLock().lock(); - result = new StreamsResultSet(providerQueue); - providerQueue = QueueUtils.constructQueue(); - } finally { - lock.writeLock().unlock(); - } - - if ( result.size() == 0 && providerQueue.isEmpty() && executor.isTerminated() ) { - LOGGER.info("Finished. Cleaning up..."); - - running.set(false); - - LOGGER.info("Exiting"); - } - - return result; - - } - - public StreamsResultSet readNew(BigInteger sequence) { - LOGGER.debug("{} readNew", STREAMS_ID); - throw new NotImplementedException(); - } - - public StreamsResultSet readRange(DateTime start, DateTime end) { - LOGGER.debug("{} readRange", STREAMS_ID); - throw new NotImplementedException(); - } - /** * get Twitter Client from TwitterUserInformationConfiguration. * @return result @@ -296,12 +240,10 @@ public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, Strea } - @Override public void cleanUp() { ExecutorUtils.shutdownAndAwaitTermination(executor); } - @Override public boolean isRunning() { LOGGER.debug("executor.isTerminated: {}", executor.isTerminated()); LOGGER.debug("tasks.size(): {}", tasks.size()); @@ -314,13 +256,21 @@ public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, Strea } @Override - public Iterator<Tweet> call() throws Exception { + public Iterator<Tweet> call() throws InterruptedException, ExecutionException { prepare(config); startStream(); do { Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - } while ( isRunning()); + } while (isRunning()); + IteratorChain chain = new IteratorChain(); + int received = 0; + while (received < tasks.size()) { + Future<Iterator<Tweet>> resultFuture = completionService.take(); + Iterator<Tweet> result = resultFuture.get(); + chain.addIterator(result); + received++; + } cleanUp(); - return providerQueue.stream().map( x -> ((Tweet)x.getDocument())).iterator(); + return chain; } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java index 92f1c64..7ef0493 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java @@ -18,6 +18,7 @@ package org.apache.streams.twitter.provider; +import com.google.common.base.Strings; import org.apache.streams.core.StreamsDatum; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.api.ThirtyDaySearchRequest; @@ -32,15 +33,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; +import java.util.*; import java.util.concurrent.Callable; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; /** - * Retrieve recent posts for a single user id. + * Retrieve recent posts from premium thirty day search. */ public class ThirtyDaySearchProviderTask implements Callable<Iterator<Tweet>>, Runnable { @@ -51,7 +51,7 @@ public class ThirtyDaySearchProviderTask implements Callable<Iterator<Tweet>>, R protected ThirtyDaySearchProvider provider; protected Twitter client; protected ThirtyDaySearchRequest request; - protected List<Tweet> responseList; + protected List<Tweet> responseList = new ArrayList<>(); /** * ThirtyDaySearchProviderTask constructor. @@ -63,15 +63,15 @@ public class ThirtyDaySearchProviderTask implements Callable<Iterator<Tweet>>, R this.provider = provider; this.client = twitter; this.request = request; - this.responseList = new ArrayList<>(); } int item_count = 0; int last_count = 0; int page_count = 0; + String next = null; @Override - public void run() { + public Iterator<Tweet> call() throws Exception { LOGGER.info("Thread Starting: {}", request.toString()); @@ -81,46 +81,47 @@ public class ThirtyDaySearchProviderTask implements Callable<Iterator<Tweet>>, R List<Tweet> statuses = response.getResults(); - responseList.addAll(statuses); - last_count = statuses.size(); - if( statuses.size() > 0 ) { - for (Tweet status : statuses) { + // count items but dont truncate response b/c we already paid for them + item_count += statuses.size(); - if (item_count < provider.getConfig().getMaxItems()) { - ComponentUtils.offerUntilSuccess(new StreamsDatum(status), provider.providerQueue); - item_count++; - } + page_count++; - } + responseList.addAll(statuses); - Stream<Long> statusIds = statuses.stream().map(status -> status.getId()); - page_count++; - request.setNext(response.getNext()); + next = response.getNext(); - } + request.setNext(next); - } - while (shouldContinuePulling(last_count, page_count, item_count)); + LOGGER.info("item_count: {} last_count: {} page_count: {} next: {} ", item_count, last_count, page_count, next); - LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, last_count, page_count); + } + while (shouldContinuePulling(last_count, page_count, item_count, next)); + return responseList.iterator(); } - public boolean shouldContinuePulling(int count, int page_count, int item_count) { + public boolean shouldContinuePulling(int count, int page_count, int item_count, String next) { + boolean shouldContinuePulling = count > 0; + if (Strings.isNullOrEmpty(next)) { + shouldContinuePulling = false; + } if (item_count >= provider.getConfig().getMaxItems()) { - return false; + shouldContinuePulling = false; } else if (page_count >= provider.getConfig().getMaxPages()) { - return false; - } else { - return (count > 0); + shouldContinuePulling = false; } + LOGGER.info("shouldContinuePulling: ", shouldContinuePulling); + return shouldContinuePulling; } @Override - public Iterator<Tweet> call() throws Exception { - run(); - return responseList.iterator(); + public void run() { + try { + this.call(); + } catch (Exception e) { + e.printStackTrace(); + } } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java index 3afb785..d3ff1b9 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java @@ -31,7 +31,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.ArrayList; +import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; @@ -67,6 +69,7 @@ public class TwitterTimelineProviderTask implements Callable<Iterator<Tweet>>, R int item_count = 0; int last_count = 0; int page_count = 0; + Date earliest_timestamp = new Date(Long.MAX_VALUE); @Override public void run() { @@ -86,9 +89,19 @@ public class TwitterTimelineProviderTask implements Callable<Iterator<Tweet>>, R for (Tweet status : statuses) { + earliest_timestamp = Date.from(Instant.ofEpochMilli(Math.min(earliest_timestamp.getTime(), status.getCreatedAt().getTime()))); + if (item_count < provider.getConfig().getMaxItems()) { - ComponentUtils.offerUntilSuccess(new StreamsDatum(status), provider.providerQueue); - item_count++; + + if( (provider.getConfig().getMinTimestamp() != null && + earliest_timestamp.after(provider.getConfig().getMinTimestamp())) || + provider.getConfig().getMinTimestamp() == null ) { + + ComponentUtils.offerUntilSuccess(new StreamsDatum(status), provider.providerQueue); + item_count++; + + } + } } @@ -101,14 +114,17 @@ public class TwitterTimelineProviderTask implements Callable<Iterator<Tweet>>, R } } - while (shouldContinuePulling(last_count, page_count, item_count)); + while (shouldContinuePulling(last_count, page_count, item_count, earliest_timestamp)); LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, last_count, page_count); } - public boolean shouldContinuePulling(int count, int page_count, int item_count) { - if (item_count == provider.getConfig().getMaxItems()) { + public boolean shouldContinuePulling(int count, int page_count, int item_count, Date earliest_timestamp) { + if (provider.getConfig().getMinTimestamp() != null && + earliest_timestamp.before(provider.getConfig().getMinTimestamp())) { + return false; + } else if (item_count == provider.getConfig().getMaxItems()) { return false; } else if (page_count == provider.getConfig().getMaxPages()) { return false; diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json index 41f2dd3..3bca4e4 100644 --- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json +++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json @@ -16,6 +16,15 @@ "max_pages": { "type": "integer", "description": "Max items per page to request" + }, + "min_timestamp": { + "type": "string", + "format": "date-time", + "description": "Earliest timestamp permitted" + }, + "page_size": { + "type": "integer", + "description": "Max items per page to request" } } } \ No newline at end of file