employ args to simplify test and provider command line
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d9e58cdd Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d9e58cdd Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d9e58cdd Branch: refs/heads/master Commit: d9e58cdd67020520d592aad621b3aff6a8249537 Parents: 0813b11 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Authored: Tue Oct 4 20:25:44 2016 -0500 Committer: Steve Blackmon @steveblackmon <sblack...@apache.org> Committed: Tue Oct 4 20:25:44 2016 -0500 ---------------------------------------------------------------------- .../streams-provider-twitter/pom.xml | 12 ++++ .../provider/TwitterTimelineProvider.java | 64 +++++++++++++++----- .../provider/TwitterTimelineProviderIT.java | 35 +---------- .../resources/TwitterTimelineProviderIT.conf | 4 ++ .../resources/TwitterTimelineProviderTest.conf | 4 -- 5 files changed, 69 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e58cdd/streams-contrib/streams-provider-twitter/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml index 903d3a7..7ec0908 100644 --- a/streams-contrib/streams-provider-twitter/pom.xml +++ b/streams-contrib/streams-provider-twitter/pom.xml @@ -209,6 +209,18 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.5.0</version> + <executions> + <execution> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e58cdd/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index b8653b8..61cddaf 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@ -24,8 +24,12 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Queues; import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; import org.apache.commons.lang.NotImplementedException; import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; @@ -42,6 +46,11 @@ import org.slf4j.LoggerFactory; import twitter4j.*; import twitter4j.conf.ConfigurationBuilder; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.PrintStream; import java.io.Serializable; import java.math.BigInteger; import java.util.*; @@ -55,8 +64,22 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Retrieve recent posts from a list of user ids or names. + * + * To use from command line: + * + * Supply (at least) the following required configuration in application.conf: + * + * twitter.oauth.consumerKey + * twitter.oauth.consumerSecret + * twitter.oauth.accessToken + * twitter.oauth.accessTokenSecret + * twitter.info + * + * Launch using: + * + * mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterTimelineProvider -Dexec.args="application.conf tweets.json" */ -public class TwitterTimelineProvider implements StreamsProvider, Serializable, Runnable { +public class TwitterTimelineProvider implements StreamsProvider, Serializable { public final static String STREAMS_ID = "TwitterTimelineProvider"; @@ -64,30 +87,43 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable, R private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); - public static void main(String[] args) { - TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration("twitter"); + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File conf_file = new File(configfile); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter"); TwitterTimelineProvider provider = new TwitterTimelineProvider(config); - provider.run(); - } - @Override - public void run() { - prepare(config); - startStream(); + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); do { - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - Iterator<StreamsDatum> iterator = readCurrent().iterator(); + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); while(iterator.hasNext()) { StreamsDatum datum = iterator.next(); String json; try { json = MAPPER.writeValueAsString(datum.getDocument()); - System.out.println(json); + outStream.println(json); } catch (JsonProcessingException e) { System.err.println(e.getMessage()); } } - } while( isRunning()); + } while( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); } public static final int MAX_NUMBER_WAITING = 10000; @@ -189,7 +225,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable, R StreamsResultSet result; - LOGGER.info("Providing {} docs", providerQueue.size()); + LOGGER.debug("Providing {} docs", providerQueue.size()); try { lock.writeLock().lock(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e58cdd/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java index e0f3b6a..f21a87e 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java @@ -43,26 +43,10 @@ public class TwitterTimelineProviderIT { @Test public void testTwitterTimelineProvider() throws Exception { - PrintStream stdout = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stdout.txt"))); - PrintStream stderr = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stderr.txt"))); + String configfile = "./target/test-classes/TwitterTimelineProviderIT.conf"; + String outfile = "./target/test-classes/TwitterTimelineProviderIT.txt"; - System.setOut(stdout); - System.setErr(stderr); - - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/TwitterTimelineProviderTest.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); - TwitterUserInformationConfiguration testConfig = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe.getConfig("twitter")); - - TwitterTimelineProvider provider = new TwitterTimelineProvider(testConfig); - provider.run(); - - stdout.flush(); - stderr.flush(); + TwitterTimelineProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); File out = new File("target/test-classes/TwitterTimelineProviderTest.stdout.txt"); assert (out.exists()); @@ -76,18 +60,5 @@ public class TwitterTimelineProviderIT { assert (outCounter.getLineNumber() == 1000); - File err = new File("target/test-classes/TwitterTimelineProviderTest.stderr.txt"); - assert (err.exists()); - assert (err.canRead()); - assert (err.isFile()); - - FileReader errReader = new FileReader(err); - LineNumberReader errCounter = new LineNumberReader(errReader); - - while(errCounter.readLine() != null) {} - - assert (errCounter.getLineNumber() == 0); - - } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e58cdd/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderIT.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderIT.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderIT.conf new file mode 100644 index 0000000..a7862c4 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderIT.conf @@ -0,0 +1,4 @@ +twitter.info = [ + 18055613 +] +twitter.max_items = 1000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e58cdd/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf deleted file mode 100644 index a7862c4..0000000 --- a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf +++ /dev/null @@ -1,4 +0,0 @@ -twitter.info = [ - 18055613 -] -twitter.max_items = 1000 \ No newline at end of file