This is an automated email from the ASF dual-hosted git repository. sblackmon pushed a commit to branch STREAMS-605 in repository https://gitbox.apache.org/repos/asf/streams.git
commit 66508b0cb3ff198b58a43cc90eeb1299bbcafb57 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> AuthorDate: Mon Jun 11 11:16:25 2018 -0500 resolves STREAMS-605 --- .../twitter/provider/SevenDaySearchProvider.java | 119 +++++++------- .../provider/SevenDaySearchProviderTask.java | 13 +- .../twitter/provider/ThirtyDaySearchProvider.java | 121 +++++++------- .../provider/ThirtyDaySearchProviderTask.java | 14 +- .../twitter/provider/TwitterEngagersProvider.java | 175 +++++++++++---------- .../provider/TwitterFollowersIdsProviderTask.java | 18 ++- .../provider/TwitterFollowersListProviderTask.java | 20 ++- .../twitter/provider/TwitterFollowingProvider.java | 149 +++++++++--------- .../provider/TwitterFriendsIdsProviderTask.java | 17 +- .../provider/TwitterFriendsListProviderTask.java | 23 ++- .../twitter/provider/TwitterRetweetsTask.java | 9 +- .../twitter/provider/TwitterTimelineProvider.java | 99 +++++++----- .../provider/TwitterTimelineProviderTask.java | 16 +- .../provider/TwitterUserInformationProvider.java | 60 +++---- 14 files changed, 489 insertions(+), 364 deletions(-) diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java index 10a2661..2c76a5f 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java @@ -25,11 +25,15 @@ import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.core.util.ExecutorUtils; +import org.apache.streams.core.util.QueueUtils; import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.StreamsJacksonMapperConfiguration; import org.apache.streams.twitter.config.SevenDaySearchProviderConfiguration; import org.apache.streams.twitter.api.SevenDaySearchRequest; import org.apache.streams.twitter.api.Twitter; import org.apache.streams.twitter.converter.TwitterDateTimeFormat; +import org.apache.streams.twitter.pojo.Tweet; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -54,10 +58,13 @@ import java.io.PrintStream; import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Queue; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -69,7 +76,7 @@ import java.util.stream.Stream; /** * Retrieve recent posts from a list of user ids or names. */ -public class SevenDaySearchProvider implements StreamsProvider, Serializable { +public class SevenDaySearchProvider implements Callable<Iterator<Tweet>>, StreamsProvider, Serializable { private static final String STREAMS_ID = "SevenDaySearchProvider"; @@ -91,11 +98,14 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable { protected Twitter client; - protected ListeningExecutorService executor; + protected ExecutorService executor; - protected final AtomicBoolean running = new AtomicBoolean(); + StreamsConfiguration streamsConfiguration; + + private List<Callable<Object>> tasks = new ArrayList<>(); + private List<Future<Object>> futures = new ArrayList<>(); - private List<ListenableFuture<Object>> futures = new ArrayList<>(); + protected final AtomicBoolean running = new AtomicBoolean(); /** * To use from command line: @@ -136,26 +146,21 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable { SevenDaySearchProviderConfiguration config = new ComponentConfigurator<>(SevenDaySearchProviderConfiguration.class).detectConfiguration(); SevenDaySearchProvider provider = new SevenDaySearchProvider(config); - ObjectMapper mapper = new StreamsJacksonMapper(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())); - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - for (StreamsDatum datum : provider.readCurrent()) { - String json; - try { - json = mapper.writeValueAsString(datum.getDocument()); - outStream.println(json); - } catch (JsonProcessingException ex) { - System.err.println(ex.getMessage()); - } + ObjectMapper mapper = StreamsJacksonMapper.getInstance(new StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()))); + + Iterator<Tweet> results = provider.call(); + + results.forEachRemaining(d -> { + try { + outStream.println(mapper.writeValueAsString(d)); + } catch( Exception e ) { + LOGGER.warn("Exception", e); } - } - while ( provider.isRunning() ); - provider.cleanUp(); + }); + outStream.flush(); + } public SevenDaySearchProvider() { @@ -184,7 +189,7 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable { try { lock.writeLock().lock(); - providerQueue = constructQueue(); + providerQueue = QueueUtils.constructQueue(); } finally { lock.writeLock().unlock(); } @@ -200,7 +205,7 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable { request = new SevenDaySearchRequest(); request.setQ(config.getQ()); - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(); + streamsConfiguration = StreamsConfigurator.detectConfiguration(); try { client = getTwitterClient(); @@ -211,7 +216,7 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable { Objects.requireNonNull(client); executor = MoreExecutors.listeningDecorator( - TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize( + ExecutorUtils.newFixedThreadPoolWithQueueSize( config.getThreadsPerProvider().intValue(), streamsConfiguration.getQueueSize().intValue() ) @@ -224,24 +229,32 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable { @Override public void startStream() { - LOGGER.debug("{} startStream", STREAMS_ID); + Objects.requireNonNull(executor); + + LOGGER.info("startStream"); running.set(true); - executor.shutdown(); + LOGGER.info("running: {}", running.get()); + + ExecutorUtils.shutdownAndAwaitTermination(executor); + + LOGGER.info("running: {}", running.get()); } protected void submitSearchThread() { - SevenDaySearchProviderTask providerTask = new SevenDaySearchProviderTask( + Callable providerTask = new SevenDaySearchProviderTask( this, client, request ); - ListenableFuture future = executor.submit(providerTask); + LOGGER.info("Thread Created: {}", request); + tasks.add(providerTask); + Future future = executor.submit(providerTask); futures.add(future); - LOGGER.info("Thread Submitted: {}", providerTask.request); + LOGGER.info("Thread Submitted: {}", request); } @@ -255,8 +268,7 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable { try { lock.writeLock().lock(); result = new StreamsResultSet(providerQueue); - result.setCounter(new DatumStatusCounter()); - providerQueue = constructQueue(); + providerQueue = QueueUtils.constructQueue(); } finally { lock.writeLock().unlock(); } @@ -273,10 +285,6 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable { } - protected Queue<StreamsDatum> constructQueue() { - return new LinkedBlockingQueue<StreamsDatum>(); - } - public StreamsResultSet readNew(BigInteger sequence) { LOGGER.debug("{} readNew", STREAMS_ID); throw new NotImplementedException(); @@ -299,35 +307,30 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable { @Override public void cleanUp() { - shutdownAndAwaitTermination(executor); - } - - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - System.err.println("Pool did not terminate"); - } - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } + ExecutorUtils.shutdownAndAwaitTermination(executor); } @Override public boolean isRunning() { - if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { - LOGGER.info("Completed"); + LOGGER.debug("executor.isTerminated: {}", executor.isTerminated()); + LOGGER.debug("tasks.size(): {}", tasks.size()); + LOGGER.debug("futures.size(): {}", futures.size()); + if ( tasks.size() > 0 && tasks.size() == futures.size() && executor.isTerminated() ) { running.set(false); - LOGGER.info("Exiting"); } + LOGGER.debug("isRunning: {}", running.get()); return running.get(); } + + @Override + public Iterator<Tweet> call() throws Exception { + prepare(config); + startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + } while ( isRunning()); + cleanUp(); + return providerQueue.stream().map( x -> ((Tweet)x.getDocument())).distinct().iterator(); + + } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java index 8bb0b0f..ac094dc 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java @@ -32,14 +32,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.Callable; import java.util.stream.Collectors; import java.util.stream.Stream; /** * Retrieve recent posts for a single user id. */ -public class SevenDaySearchProviderTask implements Runnable { +public class SevenDaySearchProviderTask implements Callable<Iterator<Tweet>>, Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(SevenDaySearchProviderTask.class); @@ -48,6 +50,7 @@ public class SevenDaySearchProviderTask implements Runnable { protected SevenDaySearchProvider provider; protected Twitter client; protected SevenDaySearchRequest request; + protected List<Tweet> responseList; /** * SevenDaySearchProviderTask constructor. @@ -76,6 +79,8 @@ public class SevenDaySearchProviderTask implements Runnable { List<Tweet> statuses = response.getStatuses(); + responseList.addAll(statuses); + last_count = statuses.size(); if( statuses.size() > 0 ) { @@ -109,6 +114,10 @@ public class SevenDaySearchProviderTask implements Runnable { && page_count <= provider.getConfig().getMaxPages()); } - + @Override + public Iterator<Tweet> call() throws Exception { + run(); + return responseList.iterator(); + } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java index 080a30d..287ab4f 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java @@ -25,11 +25,15 @@ import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.core.util.ExecutorUtils; +import org.apache.streams.core.util.QueueUtils; import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.StreamsJacksonMapperConfiguration; import org.apache.streams.twitter.config.ThirtyDaySearchProviderConfiguration; import org.apache.streams.twitter.api.ThirtyDaySearchRequest; import org.apache.streams.twitter.api.Twitter; import org.apache.streams.twitter.converter.TwitterDateTimeFormat; +import org.apache.streams.twitter.pojo.Tweet; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -54,10 +58,13 @@ import java.io.PrintStream; import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Queue; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -69,7 +76,7 @@ import java.util.stream.Stream; /** * Retrieve recent posts from a list of user ids or names. */ -public class ThirtyDaySearchProvider implements StreamsProvider, Serializable { +public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, StreamsProvider, Serializable { private static final String STREAMS_ID = "ThirtyDaySearchProvider"; @@ -91,11 +98,14 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable { protected Twitter client; - protected ListeningExecutorService executor; + protected ExecutorService executor; - protected final AtomicBoolean running = new AtomicBoolean(); + private List<Callable<Object>> tasks = new ArrayList<>(); + private List<Future<Object>> futures = new ArrayList<>(); + + StreamsConfiguration streamsConfiguration; - private List<ListenableFuture<Object>> futures = new ArrayList<>(); + protected final AtomicBoolean running = new AtomicBoolean(); /** * To use from command line: @@ -137,26 +147,21 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable { ThirtyDaySearchProviderConfiguration config = new ComponentConfigurator<>(ThirtyDaySearchProviderConfiguration.class).detectConfiguration(); ThirtyDaySearchProvider provider = new ThirtyDaySearchProvider(config); - ObjectMapper mapper = new StreamsJacksonMapper(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())); - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - for (StreamsDatum datum : provider.readCurrent()) { - String json; - try { - json = mapper.writeValueAsString(datum.getDocument()); - outStream.println(json); - } catch (JsonProcessingException ex) { - System.err.println(ex.getMessage()); - } + ObjectMapper mapper = StreamsJacksonMapper.getInstance(new StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()))); + + Iterator<Tweet> results = provider.call(); + + results.forEachRemaining(d -> { + try { + outStream.println(mapper.writeValueAsString(d)); + } catch( Exception e ) { + LOGGER.warn("Exception", e); } - } - while ( provider.isRunning() ); - provider.cleanUp(); + }); + outStream.flush(); + } public ThirtyDaySearchProvider(ThirtyDaySearchProviderConfiguration config) { @@ -181,7 +186,7 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable { try { lock.writeLock().lock(); - providerQueue = constructQueue(); + providerQueue = QueueUtils.constructQueue(); } finally { lock.writeLock().unlock(); } @@ -197,7 +202,7 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable { request = new ThirtyDaySearchRequest(); request.setQuery(config.getQuery()); - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(); + streamsConfiguration = StreamsConfigurator.detectConfiguration(); try { client = getTwitterClient(); @@ -208,7 +213,7 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable { Objects.requireNonNull(client); executor = MoreExecutors.listeningDecorator( - TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize( + ExecutorUtils.newFixedThreadPoolWithQueueSize( config.getThreadsPerProvider().intValue(), streamsConfiguration.getQueueSize().intValue() ) @@ -221,25 +226,32 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable { @Override public void startStream() { - LOGGER.debug("{} startStream", STREAMS_ID); + Objects.requireNonNull(executor); + + LOGGER.info("startStream"); running.set(true); - executor.shutdown(); + LOGGER.info("running: {}", running.get()); + + ExecutorUtils.shutdownAndAwaitTermination(executor); + + LOGGER.info("running: {}", running.get()); } protected void submitSearchThread() { - ThirtyDaySearchProviderTask providerTask = new ThirtyDaySearchProviderTask( + Callable providerTask = new ThirtyDaySearchProviderTask( this, client, request ); - ListenableFuture future = executor.submit(providerTask); - futures.add(future); - LOGGER.info("Thread Submitted: {}", providerTask.request); - + LOGGER.info("Thread Created: {}", request); + tasks.add(providerTask); + Future future = executor.submit(providerTask); + futures.add(future); + LOGGER.info("Thread Submitted: {}", request); } @Override @@ -252,8 +264,7 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable { try { lock.writeLock().lock(); result = new StreamsResultSet(providerQueue); - result.setCounter(new DatumStatusCounter()); - providerQueue = constructQueue(); + providerQueue = QueueUtils.constructQueue(); } finally { lock.writeLock().unlock(); } @@ -270,10 +281,6 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable { } - protected Queue<StreamsDatum> constructQueue() { - return new LinkedBlockingQueue<StreamsDatum>(); - } - public StreamsResultSet readNew(BigInteger sequence) { LOGGER.debug("{} readNew", STREAMS_ID); throw new NotImplementedException(); @@ -296,35 +303,29 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable { @Override public void cleanUp() { - shutdownAndAwaitTermination(executor); - } - - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - System.err.println("Pool did not terminate"); - } - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } + ExecutorUtils.shutdownAndAwaitTermination(executor); } @Override public boolean isRunning() { - if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { - LOGGER.info("Completed"); + LOGGER.debug("executor.isTerminated: {}", executor.isTerminated()); + LOGGER.debug("tasks.size(): {}", tasks.size()); + LOGGER.debug("futures.size(): {}", futures.size()); + if ( tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() ) { running.set(false); - LOGGER.info("Exiting"); } + LOGGER.debug("isRunning: {}", running.get()); return running.get(); } + + @Override + public Iterator<Tweet> call() throws Exception { + prepare(config); + startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + } while ( isRunning()); + cleanUp(); + return providerQueue.stream().map( x -> ((Tweet)x.getDocument())).iterator(); + } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java index 07b40e5..dea770b 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java @@ -31,14 +31,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.Callable; import java.util.stream.Collectors; import java.util.stream.Stream; /** * Retrieve recent posts for a single user id. */ -public class ThirtyDaySearchProviderTask implements Runnable { +public class ThirtyDaySearchProviderTask implements Callable<Iterator<Tweet>>, Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(ThirtyDaySearchProviderTask.class); @@ -47,6 +49,7 @@ public class ThirtyDaySearchProviderTask implements Runnable { protected ThirtyDaySearchProvider provider; protected Twitter client; protected ThirtyDaySearchRequest request; + protected List<Tweet> responseList; /** * ThirtyDaySearchProviderTask constructor. @@ -75,6 +78,8 @@ public class ThirtyDaySearchProviderTask implements Runnable { List<Tweet> statuses = response.getResults(); + responseList.addAll(statuses); + last_count = statuses.size(); if( statuses.size() > 0 ) { @@ -107,6 +112,9 @@ public class ThirtyDaySearchProviderTask implements Runnable { && page_count <= provider.getConfig().getMaxPages()); } - - + @Override + public Iterator<Tweet> call() throws Exception { + run(); + return responseList.iterator(); + } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java index bbb2881..e244f50 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java @@ -24,8 +24,11 @@ import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.core.util.ExecutorUtils; +import org.apache.streams.core.util.QueueUtils; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.StreamsJacksonMapperConfiguration; +import org.apache.streams.twitter.api.Twitter; import org.apache.streams.twitter.config.TwitterEngagersProviderConfiguration; import org.apache.streams.twitter.config.TwitterTimelineProviderConfiguration; import org.apache.streams.twitter.api.RetweetsRequest; @@ -36,6 +39,7 @@ import org.apache.streams.twitter.pojo.User; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -58,8 +62,11 @@ import java.math.BigInteger; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Queue; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -73,7 +80,7 @@ import static org.apache.streams.twitter.provider.TwitterUserInformationProvider /** * Retrieve posts from a list of user ids or names, then provide all of the users who retweeted those posts. */ -public class TwitterEngagersProvider extends TwitterTimelineProvider implements StreamsProvider, Serializable { +public class TwitterEngagersProvider implements Callable<Iterator<User>>, StreamsProvider, Serializable { private static final String STREAMS_ID = "TwitterEngagersProvider"; @@ -91,14 +98,19 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements protected final AtomicBoolean running = new AtomicBoolean(); - private List<ListenableFuture<Object>> futures = new ArrayList<>(); + protected Twitter client; - protected ListeningExecutorService executor; + private List<Callable<Object>> tasks = new ArrayList<>(); + private List<Future<Object>> futures = new ArrayList<>(); + + public static ExecutorService executor; StreamsConfiguration streamsConfiguration; RetweetsRequest baseRetweetsRequest; + TwitterTimelineProvider timelineProvider; + /** * To use from command line: * @@ -134,29 +146,22 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)).withFallback(StreamsConfigurator.getConfig()); StreamsConfigurator.addConfig(testResourceConfig); - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(); + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + ObjectMapper mapper = StreamsJacksonMapper.getInstance(new StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()))); + TwitterEngagersProviderConfiguration config = new ComponentConfigurator<>(TwitterEngagersProviderConfiguration.class).detectConfiguration(); TwitterEngagersProvider provider = new TwitterEngagersProvider(config); - ObjectMapper mapper = StreamsJacksonMapper.getInstance(new StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()))); + Iterator<User> results = provider.call(); - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - for (StreamsDatum datum : provider.readCurrent()) { - String json; - try { - json = mapper.writeValueAsString(datum.getDocument()); - outStream.println(json); - } catch (JsonProcessingException ex) { - System.err.println(ex.getMessage()); - } + results.forEachRemaining(d -> { + try { + outStream.println(mapper.writeValueAsString(d)); + } catch( Exception e ) { + LOGGER.warn("Exception", e); } - } - while ( provider.isRunning() ); - provider.cleanUp(); + }); + outStream.flush(); } @@ -176,27 +181,37 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements @Override public void prepare(Object configurationObject) { + timelineProvider = new TwitterTimelineProvider(config); + if( configurationObject instanceof TwitterEngagersProviderConfiguration ) { this.config = (TwitterEngagersProviderConfiguration)configurationObject; - super.prepare(MAPPER.convertValue(this.config, TwitterTimelineProviderConfiguration.class)); + timelineProvider.prepare(MAPPER.convertValue(this.config, TwitterTimelineProviderConfiguration.class)); } else if( configurationObject instanceof TwitterTimelineProviderConfiguration ) { - super.prepare(configurationObject); + timelineProvider.prepare(configurationObject); this.config = MAPPER.convertValue(this.config, TwitterEngagersProviderConfiguration.class); } else { - super.prepare(null); + timelineProvider.prepare(null); } streamsConfiguration = StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig()); try { lock.writeLock().lock(); - providerQueue = constructQueue(); + providerQueue = QueueUtils.constructQueue(); } finally { lock.writeLock().unlock(); } + try { + client = getTwitterClient(); + } catch (InstantiationException e) { + LOGGER.error("InstantiationException", e); + } + + Objects.requireNonNull(client); + executor = MoreExecutors.listeningDecorator( - TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize( + ExecutorUtils.newFixedThreadPoolWithQueueSize( config.getThreadsPerProvider().intValue(), streamsConfiguration.getQueueSize().intValue() ) @@ -206,41 +221,50 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements } + /** + * get Twitter Client from TwitterUserInformationConfiguration. + * @return result + */ + public Twitter getTwitterClient() throws InstantiationException { + + return Twitter.getInstance(config); + + } + @Override public void startStream() { LOGGER.debug("{} startStream", STREAMS_ID); - super.startStream(); + Iterator<Tweet> timelineIterator = timelineProvider.call(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = super.readCurrent().iterator(); - while (iterator.hasNext()) { - StreamsDatum datum = iterator.next(); - Tweet tweet = (Tweet) datum.getDocument(); - submitRetweeterIdsTaskThread(tweet.getId()); - } - } - while ( super.isRunning() ); - super.cleanUp(); - executor.shutdown(); + List<Tweet> timelineList = Lists.newArrayList(timelineIterator); + + LOGGER.info("running: {}", running.get()); + + timelineList.forEach(tweet -> submitRetweeterIdsTaskThread(tweet.getId())); + + ExecutorUtils.shutdownAndAwaitTermination(executor); + + LOGGER.info("running: {}", running.get()); } protected void submitRetweeterIdsTaskThread( Long postId ) { + Callable<Object> callable = createTask(postId); + LOGGER.info("Thread Created: {}", postId); + tasks.add(callable); + futures.add(executor.submit(callable)); + LOGGER.info("Thread Submitted: {}", postId); + + } + + protected Callable createTask( Long postId ) { RetweetsRequest request = new ComponentConfigurator<>(RetweetsRequest.class).detectConfiguration(); request.setId(postId); - TwitterRetweetsTask providerTask = new TwitterRetweetsTask( - this, - client, - request - ); - ListenableFuture future = executor.submit(providerTask); - super.futures.add(future); - LOGGER.info("Thread Submitted: {}", providerTask.request); - + Callable callable = new TwitterRetweetsTask(this, client, request); + return callable; } @Override @@ -252,7 +276,7 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements try { lock.writeLock().lock(); - Queue<StreamsDatum> resultQueue = constructQueue(); + Queue<StreamsDatum> resultQueue = QueueUtils.constructQueue(); providerQueue.iterator().forEachRemaining( datum -> { Tweet tweet = ((Tweet) datum.getDocument()); @@ -266,12 +290,12 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements } ); result = new StreamsResultSet(resultQueue); - providerQueue = constructQueue(); + providerQueue = QueueUtils.constructQueue(); } finally { lock.writeLock().unlock(); } - if ( result.size() == 0 && providerQueue.isEmpty() && executor.isTerminated() ) { + if ( result.size() == 0 && providerQueue.isEmpty() && executor.isShutdown() && executor.isTerminated() ) { LOGGER.info("Finished. Cleaning up..."); running.set(false); @@ -283,10 +307,6 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements } - protected Queue<StreamsDatum> constructQueue() { - return new LinkedBlockingQueue<StreamsDatum>(); - } - public StreamsResultSet readNew(BigInteger sequence) { LOGGER.debug("{} readNew", STREAMS_ID); throw new NotImplementedException(); @@ -299,36 +319,31 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements @Override public void cleanUp() { - super.cleanUp(); - shutdownAndAwaitTermination(executor); + ExecutorUtils.shutdownAndAwaitTermination(executor); } - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - System.err.println("Pool did not terminate"); - } - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - - @Override public boolean isRunning() { - if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { - LOGGER.info("Completed"); + LOGGER.debug("timelineProvider.isRunning: {}", timelineProvider.isRunning()); + LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty()); + LOGGER.debug("providerQueue.size: {}", providerQueue.size()); + LOGGER.debug("executor.isTerminated: {}", executor.isTerminated()); + LOGGER.debug("tasks.size(): {}", tasks.size()); + LOGGER.debug("futures.size(): {}", futures.size()); + if ( timelineProvider.isRunning() == false && tasks.size() > 0 && tasks.size() == futures.size() && executor.isTerminated() ) { running.set(false); - LOGGER.info("Exiting"); } + LOGGER.debug("isRunning: {}", running.get()); return running.get(); } + + @Override + public Iterator<User> call() { + prepare(config); + startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + } while ( isRunning()); + cleanUp(); + return providerQueue.stream().map( x -> ((Tweet)x.getDocument()).getUser()).distinct().iterator(); + } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java index 7acbeef..0b2305a 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java @@ -22,6 +22,7 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.api.FollowersIdsRequest; import org.apache.streams.twitter.api.FollowersIdsResponse; +import org.apache.streams.twitter.api.FriendsIdsResponse; import org.apache.streams.twitter.api.Twitter; import org.apache.streams.twitter.pojo.Follow; import org.apache.streams.twitter.pojo.User; @@ -32,10 +33,15 @@ import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; + /** * Retrieve friend or follower connections for a single user id. */ -public class TwitterFollowersIdsProviderTask implements Runnable { +public class TwitterFollowersIdsProviderTask implements Callable<Iterator<FollowersIdsResponse>>, Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowersIdsProviderTask.class); @@ -44,6 +50,7 @@ public class TwitterFollowersIdsProviderTask implements Runnable { protected Twitter client; protected TwitterFollowingProvider provider; protected FollowersIdsRequest request; + protected List<FollowersIdsResponse> responseList; /** * TwitterFollowingProviderTask constructor. @@ -62,6 +69,8 @@ public class TwitterFollowersIdsProviderTask implements Runnable { Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null); + responseList = new ArrayList<>(); + LOGGER.info("Thread Starting: {}", request.toString()); getFollowersIds(request); @@ -81,6 +90,8 @@ public class TwitterFollowersIdsProviderTask implements Runnable { FollowersIdsResponse response = client.ids(request); + responseList.add(response); + last_count = response.getIds().size(); if (response.getIds().size() > 0) { @@ -122,4 +133,9 @@ public class TwitterFollowersIdsProviderTask implements Runnable { && page_count <= provider.getConfig().getMaxPages()); } + @Override + public Iterator<FollowersIdsResponse> call() throws Exception { + run(); + return responseList.iterator(); + } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java index 5195a13..94ec455 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java @@ -28,13 +28,19 @@ import org.apache.streams.twitter.pojo.User; import org.apache.streams.util.ComponentUtils; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; + /** * Retrieve friend or follower connections for a single user id. */ -public class TwitterFollowersListProviderTask implements Runnable { +public class TwitterFollowersListProviderTask implements Callable<Iterator<FollowersListResponse>>, Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowersListProviderTask.class); @@ -43,6 +49,7 @@ public class TwitterFollowersListProviderTask implements Runnable { protected Twitter client; protected TwitterFollowingProvider provider; protected FollowersListRequest request; + protected List<FollowersListResponse> responseList; /** * TwitterFollowersListProviderTask constructor. @@ -59,8 +66,12 @@ public class TwitterFollowersListProviderTask implements Runnable { @Override public void run() { + Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null); + LOGGER.info("Thread Starting: {}", request.toString()); + responseList = new ArrayList<>(); + getFollowersList(request); LOGGER.info("Thread Finished: {}", request.toString()); @@ -78,6 +89,8 @@ public class TwitterFollowersListProviderTask implements Runnable { FollowersListResponse response = client.list(request); + responseList.add(response); + last_count = response.getUsers().size(); if (response.getUsers().size() > 0) { @@ -118,4 +131,9 @@ public class TwitterFollowersListProviderTask implements Runnable { && page_count <= provider.getConfig().getMaxPages()); } + @Override + public Iterator<FollowersListResponse> call() throws Exception { + run(); + return responseList.iterator(); + } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java index 7e6949c..3b45cf3 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java @@ -25,6 +25,8 @@ import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.core.util.ExecutorUtils; +import org.apache.streams.core.util.QueueUtils; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.StreamsJacksonMapperConfiguration; import org.apache.streams.twitter.config.TwitterFollowingConfiguration; @@ -34,6 +36,7 @@ import org.apache.streams.twitter.api.FriendsIdsRequest; import org.apache.streams.twitter.api.FriendsListRequest; import org.apache.streams.twitter.api.Twitter; import org.apache.streams.twitter.converter.TwitterDateTimeFormat; +import org.apache.streams.twitter.pojo.Follow; import org.apache.streams.twitter.pojo.User; import com.fasterxml.jackson.core.JsonProcessingException; @@ -43,6 +46,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; @@ -59,26 +63,34 @@ import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Queue; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static sun.misc.PostVMInitHook.run; /** * Retrieve all follow adjacencies from a list of user ids or names. */ -public class TwitterFollowingProvider implements StreamsProvider, Serializable { +public class TwitterFollowingProvider implements Callable<Iterator<Follow>>, StreamsProvider, Serializable { public static final String STREAMS_ID = "TwitterFollowingProvider"; private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProvider.class); private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private StreamsConfiguration streamsConfiguration; private TwitterFollowingConfiguration config; protected List<String> names = new ArrayList<>(); @@ -86,9 +98,10 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable { protected Twitter client; - protected ListeningExecutorService executor; + public static ExecutorService executor; - private List<ListenableFuture<Object>> futures = new ArrayList<>(); + private List<Callable<Object>> tasks = new ArrayList<>(); + private List<Future<Object>> futures = new ArrayList<>(); protected final AtomicBoolean running = new AtomicBoolean(); @@ -129,31 +142,22 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable { Config configFile = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults()); StreamsConfigurator.addConfig(configFile); - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(); + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + ObjectMapper mapper = StreamsJacksonMapper.getInstance(new StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()))); + TwitterFollowingConfiguration config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(); TwitterFollowingProvider provider = new TwitterFollowingProvider(config); - StreamsJacksonMapperConfiguration mapperConfiguration = new StreamsJacksonMapperConfiguration() - .withDateFormats(Collections.singletonList(TwitterDateTimeFormat.TWITTER_FORMAT)); - ObjectMapper mapper = StreamsJacksonMapper.getInstance(mapperConfiguration); + Iterator<Follow> results = provider.call(); - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Thread.sleep(streamsConfiguration.getBatchFrequencyMs()); - for (StreamsDatum datum : provider.readCurrent()) { - String json; - try { - json = mapper.writeValueAsString(datum.getDocument()); - outStream.println(json); - } catch (JsonProcessingException ex) { - System.err.println(ex.getMessage()); - } + results.forEachRemaining(d -> { + try { + outStream.println(mapper.writeValueAsString(d)); + } catch( Exception e ) { + LOGGER.warn("Exception", e); } - } - while ( provider.isRunning()); - provider.cleanUp(); + }); + outStream.flush(); } @@ -178,6 +182,8 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable { public void prepare(Object configurationObject) { + this.streamsConfiguration = StreamsConfigurator.detectConfiguration(); + if( configurationObject instanceof TwitterFollowingConfiguration) { this.config = (TwitterFollowingConfiguration) configurationObject; } @@ -203,7 +209,7 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable { try { lock.writeLock().lock(); - providerQueue = constructQueue(); + providerQueue = QueueUtils.constructQueue(); } finally { lock.writeLock().unlock(); } @@ -228,7 +234,7 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable { Objects.requireNonNull(getConfig().getEndpoint()); executor = MoreExecutors.listeningDecorator( - TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize( + ExecutorUtils.newFixedThreadPoolWithQueueSize( config.getThreadsPerProvider().intValue(), streamsConfiguration.getQueueSize().intValue() ) @@ -237,12 +243,18 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable { Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers")); for (Long id : ids) { - submitTask(createTask(id, null)); + Callable<Object> callable = createTask(id, null); + LOGGER.info("Thread Created: {}", id); + tasks.add(callable); + futures.add(executor.submit(callable)); LOGGER.info("Thread Submitted: {}", id); } for (String name : names) { - submitTask(createTask(null, name)); + Callable<Object> callable = createTask(null, name); + LOGGER.info("Thread Created: {}", name); + tasks.add(callable); + futures.add(executor.submit(callable)); LOGGER.info("Thread Submitted: {}", name); } @@ -256,45 +268,42 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable { running.set(true); - LOGGER.info("isRunning"); + LOGGER.info("running: {}", running.get()); + + ExecutorUtils.shutdownAndAwaitTermination(executor); - executor.shutdown(); + LOGGER.info("running: {}", running.get()); } - protected Runnable createTask(Long id, String name) { + protected Callable createTask(Long id, String name) { if( config.getEndpoint().equals("friends") && config.getIdsOnly() == true ) { FriendsIdsRequest request = (FriendsIdsRequest)new FriendsIdsRequest().withId(id).withScreenName(name); return new TwitterFriendsIdsProviderTask( - this, - client, - request); + this, + client, + request); } else if( config.getEndpoint().equals("friends") && config.getIdsOnly() == false ) { FriendsListRequest request = (FriendsListRequest)new FriendsListRequest().withId(id).withScreenName(name); return new TwitterFriendsListProviderTask( - this, - client, - request); + this, + client, + request); } else if( config.getEndpoint().equals("followers") && config.getIdsOnly() == true ) { FollowersIdsRequest request = (FollowersIdsRequest)new FollowersIdsRequest().withId(id).withScreenName(name); return new TwitterFollowersIdsProviderTask( - this, - client, - request); + this, + client, + request); } else if( config.getEndpoint().equals("followers") && config.getIdsOnly() == false ) { FollowersListRequest request = (FollowersListRequest)new FollowersListRequest().withId(id).withScreenName(name); return new TwitterFollowersListProviderTask( - this, - client, - request); + this, + client, + request); } else return null; } - protected void submitTask(Runnable providerTask) { - ListenableFuture future = executor.submit(providerTask); - futures.add(future); - } - protected Twitter getTwitterClient() throws InstantiationException { return Twitter.getInstance(config); } @@ -306,8 +315,7 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable { try { lock.writeLock().lock(); result = new StreamsResultSet(providerQueue); - result.setCounter(new DatumStatusCounter()); - providerQueue = constructQueue(); + providerQueue = QueueUtils.constructQueue(); LOGGER.debug("readCurrent: {} Documents", result.size()); } finally { lock.writeLock().unlock(); @@ -332,40 +340,29 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable { } public boolean isRunning() { - if ( providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone() ) { - LOGGER.info("All Threads Completed"); + LOGGER.debug("executor.isTerminated: {}", executor.isTerminated()); + LOGGER.debug("tasks.size(): {}", tasks.size()); + LOGGER.debug("futures.size(): {}", futures.size()); + if ( tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() ) { running.set(false); - LOGGER.info("Exiting"); } + LOGGER.debug("isRunning: {}", running.get()); return running.get(); } - // abstract this out - protected Queue<StreamsDatum> constructQueue() { - return new LinkedBlockingQueue<>(); + public void cleanUp() { + // cleanUp } - // abstract this out - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - System.err.println("Pool did not terminate"); - } - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } + @Override + public Iterator<Follow> call() { + prepare(config); + startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + } while ( isRunning()); + cleanUp(); + return providerQueue.stream().map( x -> (Follow)x.getDocument()).iterator(); } - public void cleanUp() { - shutdownAndAwaitTermination(executor); - } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java index 5a53123..402e825 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java @@ -32,10 +32,15 @@ import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; + /** * Retrieve friend or follower connections for a single user id. */ -public class TwitterFriendsIdsProviderTask implements Runnable { +public class TwitterFriendsIdsProviderTask implements Callable<Iterator<FriendsIdsResponse>>, Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFriendsIdsProviderTask.class); @@ -44,6 +49,7 @@ public class TwitterFriendsIdsProviderTask implements Runnable { protected Twitter client; protected TwitterFollowingProvider provider; protected FriendsIdsRequest request; + protected List<FriendsIdsResponse> responseList; /** * TwitterFollowingProviderTask constructor. @@ -62,6 +68,8 @@ public class TwitterFriendsIdsProviderTask implements Runnable { Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null); + responseList = new ArrayList<>(); + LOGGER.info("Thread Starting: {}", request.toString()); getFriendsIds(request); @@ -81,6 +89,8 @@ public class TwitterFriendsIdsProviderTask implements Runnable { FriendsIdsResponse response = client.ids(request); + responseList.add(response); + last_count = response.getIds().size(); if (response.getIds().size() > 0) { @@ -123,4 +133,9 @@ public class TwitterFriendsIdsProviderTask implements Runnable { && page_count <= provider.getConfig().getMaxPages()); } + @Override + public Iterator<FriendsIdsResponse> call() throws Exception { + run(); + return responseList.iterator(); + } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java index e116f2f..b2917e2 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java @@ -20,6 +20,8 @@ package org.apache.streams.twitter.provider; import org.apache.streams.core.StreamsDatum; import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.twitter.api.FollowersListResponse; +import org.apache.streams.twitter.api.FriendsIdsResponse; import org.apache.streams.twitter.api.FriendsListRequest; import org.apache.streams.twitter.api.FriendsListResponse; import org.apache.streams.twitter.api.Twitter; @@ -32,10 +34,15 @@ import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; + /** * Retrieve friend or follower connections for a single user id. */ -public class TwitterFriendsListProviderTask implements Runnable { +public class TwitterFriendsListProviderTask implements Callable<Iterator<FriendsListResponse>>, Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFriendsListProviderTask.class); @@ -44,6 +51,7 @@ public class TwitterFriendsListProviderTask implements Runnable { protected Twitter client; protected TwitterFollowingProvider provider; protected FriendsListRequest request; + protected List<FriendsListResponse> responseList; /** * TwitterFollowingProviderTask constructor. @@ -67,9 +75,13 @@ public class TwitterFriendsListProviderTask implements Runnable { Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null); + responseList = new ArrayList<>(); + + LOGGER.info("Thread Starting: {}", request.toString()); + getFriendsList(request); - LOGGER.info(request.getId() != null ? request.getId().toString() : request.getScreenName() + " Thread Finished"); + LOGGER.info("Thread Finished: {}", request.toString()); } @@ -79,6 +91,8 @@ public class TwitterFriendsListProviderTask implements Runnable { FriendsListResponse response = client.list(request); + responseList.add(response); + last_count = response.getUsers().size(); if (response.getUsers().size() > 0) { @@ -121,4 +135,9 @@ public class TwitterFriendsListProviderTask implements Runnable { && page_count <= provider.getConfig().getMaxPages()); } + @Override + public Iterator<FriendsListResponse> call() throws Exception { + run(); + return responseList.iterator(); + } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java index acbe008..b1b39b1 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java @@ -32,14 +32,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.Callable; import java.util.stream.Collectors; import java.util.stream.Stream; /** * Retrieve recent posts for a single user id. */ -public class TwitterRetweetsTask implements Runnable { +public class TwitterRetweetsTask implements Callable<Iterator<Tweet>>, Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(TwitterRetweetsTask.class); @@ -76,4 +78,9 @@ public class TwitterRetweetsTask implements Runnable { } + @Override + public Iterator<Tweet> call() throws Exception { + run(); + return provider.providerQueue.stream().map(x -> (Tweet)x.getDocument()).iterator(); + } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index d9a0759..5512d35 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@ -25,11 +25,14 @@ import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.core.util.ExecutorUtils; +import org.apache.streams.core.util.QueueUtils; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.config.TwitterTimelineProviderConfiguration; import org.apache.streams.twitter.api.StatusesUserTimelineRequest; import org.apache.streams.twitter.api.Twitter; import org.apache.streams.twitter.converter.TwitterDateTimeFormat; +import org.apache.streams.twitter.pojo.Tweet; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -54,10 +57,13 @@ import java.io.PrintStream; import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Queue; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -71,7 +77,7 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor; /** * Retrieve recent posts from a list of user ids or names. */ -public class TwitterTimelineProvider implements StreamsProvider, Serializable { +public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, StreamsProvider, Serializable { private static final String STREAMS_ID = "TwitterTimelineProvider"; @@ -95,14 +101,17 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { protected int idsCount; protected Twitter client; - protected ListeningExecutorService executor; + protected ExecutorService executor; protected DateTime start; protected DateTime end; + StreamsConfiguration streamsConfiguration; + protected final AtomicBoolean running = new AtomicBoolean(); - protected List<ListenableFuture<Object>> futures = new ArrayList<>(); + private List<Callable<Object>> tasks = new ArrayList<>(); + private List<Future<Object>> futures = new ArrayList<>(); /** * To use from command line: @@ -193,9 +202,11 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { this.config = (TwitterTimelineProviderConfiguration)configurationObject; } + streamsConfiguration = StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig()); + try { lock.writeLock().lock(); - providerQueue = constructQueue(); + providerQueue = QueueUtils.constructQueue(); } finally { lock.writeLock().unlock(); } @@ -233,7 +244,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { } executor = MoreExecutors.listeningDecorator( - TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize( + ExecutorUtils.newFixedThreadPoolWithQueueSize( config.getThreadsPerProvider().intValue(), streamsConfiguration.getQueueSize().intValue() ) @@ -241,16 +252,25 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { submitTimelineThreads(ids, names); + LOGGER.info("tasks: {}", tasks.size()); + LOGGER.info("futures: {}", futures.size()); + } @Override public void startStream() { - LOGGER.debug("{} startStream", STREAMS_ID); + Objects.requireNonNull(executor); + + LOGGER.info("startStream"); running.set(true); - executor.shutdown(); + LOGGER.info("running: {}", running.get()); + + ExecutorUtils.shutdownAndAwaitTermination(executor); + + LOGGER.info("running: {}", running.get()); } @@ -260,27 +280,31 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { StatusesUserTimelineRequest request = new StatusesUserTimelineRequest(); request.setUserId(id); request.setCount(config.getPageSize()); - TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask( + Callable providerTask = new TwitterTimelineProviderTask( this, client, request ); - ListenableFuture future = executor.submit(providerTask); + LOGGER.info("Thread Created: {}", request); + tasks.add(providerTask); + Future future = executor.submit(providerTask); futures.add(future); - LOGGER.info("Thread Submitted: {}", providerTask.request); + LOGGER.info("Thread Submitted: {}", request); } for (String name : names) { StatusesUserTimelineRequest request = new StatusesUserTimelineRequest(); request.setScreenName(name); request.setCount(config.getPageSize()); - TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask( + Callable providerTask = new TwitterTimelineProviderTask( this, client, request ); - ListenableFuture future = executor.submit(providerTask); + LOGGER.info("Thread Created: {}", request); + tasks.add(providerTask); + Future future = executor.submit(providerTask); futures.add(future); - LOGGER.info("Thread Submitted: {}", providerTask.request); + LOGGER.info("Thread Submitted: {}", request); } } @@ -294,8 +318,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { try { lock.writeLock().lock(); result = new StreamsResultSet(providerQueue); - result.setCounter(new DatumStatusCounter()); - providerQueue = constructQueue(); + providerQueue = QueueUtils.constructQueue(); } finally { lock.writeLock().unlock(); } @@ -312,10 +335,6 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { } - protected Queue<StreamsDatum> constructQueue() { - return new LinkedBlockingQueue<StreamsDatum>(); - } - public StreamsResultSet readNew(BigInteger sequence) { LOGGER.debug("{} readNew", STREAMS_ID); throw new NotImplementedException(); @@ -338,35 +357,31 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { @Override public void cleanUp() { - shutdownAndAwaitTermination(executor); - } - - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - System.err.println("Pool did not terminate"); - } - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } + ExecutorUtils.shutdownAndAwaitTermination(executor); } @Override public boolean isRunning() { - if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { - LOGGER.info("Completed"); + LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty()); + LOGGER.debug("providerQueue.size: {}", providerQueue.size()); + LOGGER.debug("executor.isTerminated: {}", executor.isTerminated()); + LOGGER.debug("tasks.size(): {}", tasks.size()); + LOGGER.debug("futures.size(): {}", futures.size()); + if ( tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() ) { running.set(false); - LOGGER.info("Exiting"); } + LOGGER.debug("isRunning: ", running.get()); return running.get(); } + + @Override + public Iterator<Tweet> call() { + prepare(config); + startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + } while ( isRunning()); + cleanUp(); + return providerQueue.stream().map( x -> (Tweet)x.getDocument()).iterator(); + } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java index f8816e0..6bc9822 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java @@ -30,14 +30,17 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.Callable; import java.util.stream.Collectors; import java.util.stream.Stream; /** * Retrieve recent posts for a single user id. */ -public class TwitterTimelineProviderTask implements Runnable { +public class TwitterTimelineProviderTask implements Callable<Iterator<Tweet>>, Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProviderTask.class); @@ -46,6 +49,7 @@ public class TwitterTimelineProviderTask implements Runnable { protected TwitterTimelineProvider provider; protected Twitter client; protected StatusesUserTimelineRequest request; + protected List<Tweet> responseList; /** * TwitterTimelineProviderTask constructor. @@ -68,10 +72,14 @@ public class TwitterTimelineProviderTask implements Runnable { LOGGER.info("Thread Starting: {}", request.toString()); + responseList = new ArrayList<>(); + do { List<Tweet> statuses = client.userTimeline(request); + responseList.addAll(statuses); + last_count = statuses.size(); if( statuses.size() > 0 ) { @@ -106,5 +114,9 @@ public class TwitterTimelineProviderTask implements Runnable { } - + @Override + public Iterator<Tweet> call() throws Exception { + run(); + return responseList.iterator(); + } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java index 90cd23d..5e9970a 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java @@ -25,12 +25,15 @@ import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.core.util.ExecutorUtils; +import org.apache.streams.core.util.QueueUtils; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.config.TwitterFollowingConfiguration; import org.apache.streams.twitter.config.TwitterUserInformationConfiguration; import org.apache.streams.twitter.api.Twitter; import org.apache.streams.twitter.api.UsersLookupRequest; import org.apache.streams.twitter.converter.TwitterDateTimeFormat; +import org.apache.streams.twitter.pojo.User; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -56,10 +59,12 @@ import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -73,7 +78,7 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor; /** * Retrieve current profile status from a list of user ids or names. */ -public class TwitterUserInformationProvider implements StreamsProvider, Serializable { +public class TwitterUserInformationProvider implements Callable<Iterator<User>>, StreamsProvider, Serializable { private static final String STREAMS_ID = "TwitterUserInformationProvider"; @@ -92,6 +97,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ protected volatile Queue<StreamsDatum> providerQueue; + StreamsConfiguration streamsConfiguration; + public TwitterUserInformationConfiguration getConfig() { return config; } @@ -171,11 +178,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ } // TODO: this should be abstracted out - public static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) { - return new ThreadPoolExecutor(numThreads, numThreads, - 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); - } + /** * TwitterUserInformationProvider constructor. @@ -214,7 +217,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ Objects.requireNonNull(config.getInfo()); Objects.requireNonNull(config.getThreadsPerProvider()); - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(); + streamsConfiguration = StreamsConfigurator.detectConfiguration(); Objects.requireNonNull(streamsConfiguration.getQueueSize()); @@ -228,7 +231,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ try { lock.writeLock().lock(); - providerQueue = constructQueue(); + providerQueue = QueueUtils.constructQueue(); } finally { lock.writeLock().unlock(); } @@ -250,7 +253,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ } executor = MoreExecutors.listeningDecorator( - TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize( + ExecutorUtils.newFixedThreadPoolWithQueueSize( config.getThreadsPerProvider().intValue(), streamsConfiguration.getQueueSize().intValue() ) @@ -332,8 +335,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ try { lock.writeLock().lock(); result = new StreamsResultSet(providerQueue); - result.setCounter(new DatumStatusCounter()); - providerQueue = constructQueue(); + providerQueue = QueueUtils.constructQueue(); LOGGER.debug("readCurrent: {} Documents", result.size()); } finally { lock.writeLock().unlock(); @@ -343,10 +345,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ } - protected Queue<StreamsDatum> constructQueue() { - return new LinkedBlockingQueue<>(); - } - public StreamsResultSet readNew(BigInteger sequence) { LOGGER.debug("{} readNew", STREAMS_ID); throw new NotImplementedException(); @@ -372,31 +370,23 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ return running.get(); } - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - System.err.println("Pool did not terminate"); - } - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - protected Twitter getTwitterClient() throws InstantiationException { return Twitter.getInstance(config); } @Override public void cleanUp() { - shutdownAndAwaitTermination(executor); + ExecutorUtils.shutdownAndAwaitTermination(executor); + } + + @Override + public Iterator<User> call() throws Exception { + prepare(config); + startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + } while ( isRunning()); + cleanUp(); + return providerQueue.stream().map( x -> (User)x.getDocument()).iterator(); } } -- To stop receiving notification emails like this one, please contact sblack...@apache.org.