This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 210d828 Added filtering to Twitter Firehose Connector (#3298) 210d828 is described below commit 210d828419f8247e6999051c66ee99be544a6221 Author: David Kjerrumgaard <35466513+david-stream...@users.noreply.github.com> AuthorDate: Fri Jan 11 10:29:36 2019 -0800 Added filtering to Twitter Firehose Connector (#3298) --- pulsar-io/twitter/pom.xml | 12 +++ .../apache/pulsar/io/twitter/TwitterFireHose.java | 109 +++++++------------- .../pulsar/io/twitter/TwitterFireHoseConfig.java | 88 +++++++++++++++-- .../pulsar/io/twitter/{ => data}/TweetData.java | 24 ++++- .../pulsar/io/twitter/data/TwitterRecord.java | 66 +++++++++++++ .../pulsar/io/twitter/data/package-info.java | 19 ++++ .../twitter/endpoint/SampleStatusesEndpoint.java | 42 ++++++++ .../pulsar/io/twitter/endpoint/package-info.java | 19 ++++ .../org/apache/pulsar/io/twitter/package-info.java | 19 ++++ .../io/twitter/TwitterFireHoseConfigTests.java | 110 +++++++++++++++++++++ .../twitter/src/test/resources/sourceConfig.yaml | 23 +++++ 11 files changed, 443 insertions(+), 88 deletions(-) diff --git a/pulsar-io/twitter/pom.xml b/pulsar-io/twitter/pom.xml index cfa3cf6..48ea652 100644 --- a/pulsar-io/twitter/pom.xml +++ b/pulsar-io/twitter/pom.xml @@ -53,6 +53,18 @@ <artifactId>hbc-core</artifactId> <version>${hbc-core.version}</version> </dependency> + + <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + <version>${commons.collections.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.4</version> + </dependency> </dependencies> diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java index e1d0545..1219e0b 100644 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java @@ -24,7 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.twitter.hbc.ClientBuilder; import com.twitter.hbc.common.DelimitedStreamReader; import com.twitter.hbc.core.Constants; -import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint; +import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; import com.twitter.hbc.core.endpoint.StreamingEndpoint; import com.twitter.hbc.core.processor.HosebirdMessageProcessor; import com.twitter.hbc.httpclient.BasicClient; @@ -33,23 +33,24 @@ import com.twitter.hbc.httpclient.auth.OAuth1; import java.io.IOException; import java.io.InputStream; -import java.io.Serializable; -import java.text.SimpleDateFormat; -import java.util.Date; +import java.util.List; import java.util.Map; -import java.util.Optional; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.functions.api.Record; + +import org.apache.commons.collections.CollectionUtils; import org.apache.pulsar.io.core.PushSource; import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.Connector; import org.apache.pulsar.io.core.annotations.IOType; +import org.apache.pulsar.io.twitter.data.TweetData; +import org.apache.pulsar.io.twitter.data.TwitterRecord; +import org.apache.pulsar.io.twitter.endpoint.SampleStatusesEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Simple Push based Twitter FireHose Source + * Simple Push based Twitter FireHose Source. */ @Connector( name = "twitter", @@ -62,22 +63,16 @@ public class TwitterFireHose extends PushSource<TweetData> { private static final Logger LOG = LoggerFactory.getLogger(TwitterFireHose.class); - // ----- Fields set by the constructor - // ----- Runtime fields private Object waitObject; - private final ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + private final ObjectMapper mapper = new ObjectMapper().configure( + DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); @Override public void open(Map<String, Object> config, SourceContext sourceContext) throws IOException { TwitterFireHoseConfig hoseConfig = TwitterFireHoseConfig.load(config); - if (hoseConfig.getConsumerKey() == null - || hoseConfig.getConsumerSecret() == null - || hoseConfig.getToken() == null - || hoseConfig.getTokenSecret() == null) { - throw new IllegalArgumentException("Required property not set."); - } + hoseConfig.validate(); waitObject = new Object(); startThread(hoseConfig); } @@ -87,40 +82,13 @@ public class TwitterFireHose extends PushSource<TweetData> { stopThread(); } - // ------ Custom endpoints - - /** - * Implementing this interface allows users of this source to set a custom endpoint. - */ - public interface EndpointInitializer { - StreamingEndpoint createEndpoint(); - } - - /** - * Required for Twitter Client - */ - private static class SampleStatusesEndpoint implements EndpointInitializer, Serializable { - @Override - public StreamingEndpoint createEndpoint() { - // this default endpoint initializer returns the sample endpoint: Returning a sample from the firehose (all tweets) - StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint(); - endpoint.stallWarnings(false); - endpoint.delimited(false); - return endpoint; - } - } - private void startThread(TwitterFireHoseConfig config) { - Authentication auth = new OAuth1(config.getConsumerKey(), - config.getConsumerSecret(), - config.getToken(), - config.getTokenSecret()); BasicClient client = new ClientBuilder() .name(config.getClientName()) .hosts(config.getClientHosts()) - .endpoint(new SampleStatusesEndpoint().createEndpoint()) - .authentication(auth) + .endpoint(getEndpoint(config)) + .authentication(getAuthentication(config)) .processor(new HosebirdMessageProcessor() { public DelimitedStreamReader reader; @@ -176,42 +144,31 @@ public class TwitterFireHose extends PushSource<TweetData> { } } - static private class TwitterRecord implements Record<TweetData> { - private final TweetData tweet; - private static SimpleDateFormat dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy"); - private final boolean guestimateTweetTime; + private Authentication getAuthentication(TwitterFireHoseConfig config) { + return new OAuth1(config.getConsumerKey(), + config.getConsumerSecret(), + config.getToken(), + config.getTokenSecret()); + } - public TwitterRecord(TweetData tweet, boolean guestimateTweetTime) { - this.tweet = tweet; - this.guestimateTweetTime = guestimateTweetTime; - } + private StreamingEndpoint getEndpoint(TwitterFireHoseConfig config) { + List<Long> followings = config.getFollowings(); + List<String> terms = config.getTrackTerms(); - @Override - public Optional<String> getKey() { - // TODO: Could use user or tweet ID as key here - return Optional.empty(); - } + if (CollectionUtils.isEmpty(followings) && CollectionUtils.isEmpty(terms)) { + return new SampleStatusesEndpoint().createEndpoint(); + } else { + StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint(); - @Override - public Optional<Long> getEventTime() { - try { - if (tweet.getCreatedAt() != null) { - Date d = dateFormat.parse(tweet.getCreatedAt()); - return Optional.of(d.toInstant().toEpochMilli()); - } else if (guestimateTweetTime) { - return Optional.of(System.currentTimeMillis()); - } else { - return Optional.empty(); - } - } catch (Exception e) { - return Optional.empty(); + if (CollectionUtils.isNotEmpty(followings)) { + hosebirdEndpoint.followings(followings); } - } - @Override - public TweetData getValue() { - return tweet; + if (CollectionUtils.isNotEmpty(terms)) { + hosebirdEndpoint.trackTerms(terms); + } + + return hosebirdEndpoint; } } - } diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java index c9bf4df..3d361af 100644 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java @@ -21,16 +21,31 @@ package org.apache.pulsar.io.twitter; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.collect.Lists; +import com.twitter.hbc.core.Constants; + import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; -import com.twitter.hbc.core.Constants; -import lombok.*; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; import lombok.experimental.Accessors; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.io.core.annotations.FieldDoc; +/** + * Configuration object for the Twitter Firehose Connector. + */ @Data @Setter @Getter @@ -44,34 +59,42 @@ public class TwitterFireHoseConfig implements Serializable { @FieldDoc( required = true, defaultValue = "", - help = "Your twitter app consumer key. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details" + help = "Your twitter app consumer key. See " + + "https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details" ) private String consumerKey; + @FieldDoc( required = true, defaultValue = "", - help = "Your twitter app consumer secret. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details" + help = "Your twitter app consumer secret. " + + "See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details" ) private String consumerSecret; + @FieldDoc( required = true, defaultValue = "", - help = "Your twitter app token. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details" + help = "Your twitter app token. " + + "See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details" ) private String token; + @FieldDoc( required = true, defaultValue = "", - help = "Your twitter app token secret. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details" + help = "Your twitter app token secret. " + + "See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details" ) private String tokenSecret; + // Most firehose events have null createdAt time. If this parameter is set to true // then we estimate the createdTime of each firehose event to be current time. @FieldDoc( required = false, defaultValue = "false", - help = "Most firehose events have null createdAt time." - + " If this parameter is set to true, the connector estimates the createdTime of each firehose event to be current time." + help = "Most firehose events have null createdAt time.If this parameter is set to true, " + + "the connector estimates the createdTime of each firehose event to be current time." ) private Boolean guestimateTweetTime = false; @@ -83,12 +106,14 @@ public class TwitterFireHoseConfig implements Serializable { help = "The Twitter Firehose Client name" ) private String clientName = "pulsario-twitter-source"; + @FieldDoc( required = false, defaultValue = Constants.STREAM_HOST, help = "The Twitter Firehose stream hosts that the connector connects to" ) private String clientHosts = Constants.STREAM_HOST; + @FieldDoc( required = false, defaultValue = "50000", @@ -96,6 +121,20 @@ public class TwitterFireHoseConfig implements Serializable { ) private int clientBufferSize = 50000; + @FieldDoc( + required = false, + defaultValue = "", + help = "A comma separated list of user IDs, indicating the users to return statuses for in the stream." + ) + private String followings; + + @FieldDoc( + required = false, + defaultValue = "", + help = "Keywords to track. Phrases of keywords are specified by a comma-separated list." + ) + private String terms; + public static TwitterFireHoseConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), TwitterFireHoseConfig.class); @@ -105,4 +144,37 @@ public class TwitterFireHoseConfig implements Serializable { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(new ObjectMapper().writeValueAsString(map), TwitterFireHoseConfig.class); } + + public void validate() throws IllegalArgumentException { + if (getConsumerKey() == null || getConsumerSecret() == null + || getToken() == null || getTokenSecret() == null) { + throw new IllegalArgumentException("Required property not set."); + } + } + + public List<Long> getFollowings() { + if (StringUtils.isBlank(followings)) { + return Collections.emptyList(); + } + + List<Long> result = new ArrayList<Long> (); + + for (String s: StringUtils.split(followings, ",")) { + try { + result.add(Long.parseLong(StringUtils.trim(s))); + } catch (NumberFormatException nfEx) { + // Ignore these + } + } + + return CollectionUtils.isEmpty(result) ? Collections.emptyList() : result; + } + + public List<String> getTrackTerms() { + if (StringUtils.isBlank(terms)) { + return Collections.emptyList(); + } + + return Lists.newArrayList(StringUtils.split(terms, ",")); + } } \ No newline at end of file diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TweetData.java similarity index 91% rename from pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java rename to pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TweetData.java index e5cb79c..c06f379 100644 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TweetData.java @@ -17,11 +17,13 @@ * under the License. */ -package org.apache.pulsar.io.twitter; +package org.apache.pulsar.io.twitter.data; -import java.util.List; import lombok.Data; +/** + * POJO for Tweet object. + */ @Data public class TweetData { private String createdAt; @@ -44,7 +46,9 @@ public class TweetData { private String timestampMs; private Delete delete; - + /** + * POJO for Twitter User object. + */ @Data public static class User { private Long id; @@ -54,7 +58,7 @@ public class TweetData { private String location; private String description; private String translatorType; - private Boolean _protected; + private Boolean protectedUser; private Boolean verified; private Long followersCount; private Long friendsCount; @@ -81,6 +85,10 @@ public class TweetData { private Boolean defaultProfile; private Boolean defaultProfileImage; } + + /** + * POJO for Re-Tweet object. + */ @Data public static class RetweetedStatus { private String createdAt; @@ -100,6 +108,10 @@ public class TweetData { private String filterLevel; private String lang; } + + /** + * POJO for Tweet Status object. + */ @Data public static class Status { private Long id; @@ -107,6 +119,10 @@ public class TweetData { private Long userId; private String userIdStr; } + + /** + * POJO for Tweet Delete object. + */ @Data public static class Delete { private Status status; diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TwitterRecord.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TwitterRecord.java new file mode 100644 index 0000000..3a9f968 --- /dev/null +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TwitterRecord.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.twitter.data; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Optional; + +import org.apache.pulsar.functions.api.Record; + +/** + * Twitter Record object. + */ +public class TwitterRecord implements Record<TweetData> { + private final TweetData tweet; + private static SimpleDateFormat dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy"); + private final boolean guestimateTweetTime; + + public TwitterRecord(TweetData tweet, boolean guestimateTweetTime) { + this.tweet = tweet; + this.guestimateTweetTime = guestimateTweetTime; + } + + @Override + public Optional<String> getKey() { + // TODO: Could use user or tweet ID as key here + return Optional.empty(); + } + + @Override + public Optional<Long> getEventTime() { + try { + if (tweet.getCreatedAt() != null) { + Date d = dateFormat.parse(tweet.getCreatedAt()); + return Optional.of(d.toInstant().toEpochMilli()); + } else if (guestimateTweetTime) { + return Optional.of(System.currentTimeMillis()); + } else { + return Optional.empty(); + } + } catch (Exception e) { + return Optional.empty(); + } + } + + @Override + public TweetData getValue() { + return tweet; + } +} \ No newline at end of file diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/package-info.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/package-info.java new file mode 100644 index 0000000..d7f9649 --- /dev/null +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.twitter.data; \ No newline at end of file diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/SampleStatusesEndpoint.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/SampleStatusesEndpoint.java new file mode 100644 index 0000000..85564fd --- /dev/null +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/SampleStatusesEndpoint.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.twitter.endpoint; + +import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint; +import com.twitter.hbc.core.endpoint.StreamingEndpoint; + +import java.io.Serializable; + +/** + * Required for Twitter Client. + */ +public class SampleStatusesEndpoint implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + + public StreamingEndpoint createEndpoint() { + // Returns the sample endpoint: Returning a sample from the firehose (all tweets) + StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint(); + endpoint.stallWarnings(false); + endpoint.delimited(false); + return endpoint; + } +} \ No newline at end of file diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/package-info.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/package-info.java new file mode 100644 index 0000000..6e99d77 --- /dev/null +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.twitter.endpoint; \ No newline at end of file diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/package-info.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/package-info.java new file mode 100644 index 0000000..1690a64 --- /dev/null +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.twitter; \ No newline at end of file diff --git a/pulsar-io/twitter/src/test/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfigTests.java b/pulsar-io/twitter/src/test/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfigTests.java new file mode 100644 index 0000000..689d81d --- /dev/null +++ b/pulsar-io/twitter/src/test/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfigTests.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.twitter; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.testng.annotations.Test; + +public class TwitterFireHoseConfigTests { + + private TwitterFireHoseConfig config; + + @Test + public final void loadFromYamlFileTest() throws IOException { + File yamlFile = getFile("sourceConfig.yaml"); + config = TwitterFireHoseConfig.load(yamlFile.getAbsolutePath()); + assertNotNull(config); + } + + @Test + public final void loadFromMapTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("consumerKey", "xxx"); + map.put("consumerSecret", "xxx"); + map.put("token", "xxx"); + map.put("tokenSecret", "xxx"); + + config = TwitterFireHoseConfig.load(map); + + assertNotNull(config); + } + + @Test + public final void validValidateTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("consumerKey", "xxx"); + map.put("consumerSecret", "xxx"); + map.put("token", "xxx"); + map.put("tokenSecret", "xxx"); + + config = TwitterFireHoseConfig.load(map); + config.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Required property not set.") + public final void missingConsumerKeyValidateTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + + config = TwitterFireHoseConfig.load(map); + config.validate(); + } + + @Test + public final void getFollowingsTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("followings", "123, 456, 789"); + config = TwitterFireHoseConfig.load(map); + + List<Long> followings = config.getFollowings(); + assertNotNull(followings); + assertEquals(followings.size(), 3); + assertTrue(followings.contains(123L)); + assertTrue(followings.contains(456L)); + assertTrue(followings.contains(789L)); + } + + + @Test + public final void getTermsTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("terms", "mickey, donald, goofy"); + config = TwitterFireHoseConfig.load(map); + + List<String> terms = config.getTrackTerms(); + assertNotNull(terms); + assertEquals(terms.size(), 3); + assertTrue(terms.contains("mickey")); + } + + private File getFile(String name) { + ClassLoader classLoader = getClass().getClassLoader(); + return new File(classLoader.getResource(name).getFile()); + } + +} diff --git a/pulsar-io/twitter/src/test/resources/sourceConfig.yaml b/pulsar-io/twitter/src/test/resources/sourceConfig.yaml new file mode 100644 index 0000000..9ac5708 --- /dev/null +++ b/pulsar-io/twitter/src/test/resources/sourceConfig.yaml @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +{ +"consumerKey": "", +"consumerSecret": "" +} \ No newline at end of file