Xikui Wang has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1272
Change subject: Add user-stream for Twitter Adaptor ...................................................................... Add user-stream for Twitter Adaptor 1. Add user-stream option for Twitter Adaptor 2. Refactor part of TwitterRecordReaderFactory 3. To create a user-stream feed, using following ddl: create feed TwitterFeed using twitter_user_stream( ("format"="twitter-status"), ("type-name"="Tweet"), ... // rest is same as push feed Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838 --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java 5 files changed, 278 insertions(+), 93 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/72/1272/1 diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java index 9ead8a9..c296bc6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java @@ -26,6 +26,8 @@ import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.input.record.GenericRecord; import org.apache.asterix.external.util.FeedLogManager; +import org.apache.asterix.external.util.TwitterUtil; +import twitter4j.DirectMessage; import twitter4j.FilterQuery; import twitter4j.StallWarning; import twitter4j.Status; @@ -33,27 +35,43 @@ import twitter4j.StatusListener; import twitter4j.TwitterObjectFactory; import twitter4j.TwitterStream; +import twitter4j.User; +import twitter4j.UserList; +import twitter4j.UserStreamListener; public class TwitterPushRecordReader implements IRecordReader<String> { private LinkedBlockingQueue<String> inputQ; private TwitterStream twitterStream; private GenericRecord<String> record; + private StatusListener tweetListener; private boolean closed = false; - public TwitterPushRecordReader(TwitterStream twitterStream, FilterQuery query) { - record = new GenericRecord<>(); - inputQ = new LinkedBlockingQueue<>(); - this.twitterStream = twitterStream;//TwitterUtil.getTwitterStream(configuration); - this.twitterStream.addListener(new TweetListener(inputQ)); + public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.TweetListener tweetListener, + FilterQuery query) { + init(twitterStream); + tweetListener.setInputQ(inputQ); + this.twitterStream.addListener(tweetListener); this.twitterStream.filter(query); } - public TwitterPushRecordReader(TwitterStream twitterStream) { + public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.TweetListener tweetListener) { + init(twitterStream); + tweetListener.setInputQ(inputQ); + this.twitterStream.addListener(tweetListener); + twitterStream.sample(); + } + + public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.UserTweetsListener tweetListener) { + init(twitterStream); + tweetListener.setInputQ(inputQ); + this.twitterStream.addListener(tweetListener); + twitterStream.user(); + } + + private void init(TwitterStream twitterStream) { record = new GenericRecord<>(); inputQ = new LinkedBlockingQueue<>(); - this.twitterStream = twitterStream;// - this.twitterStream.addListener(new TweetListener(inputQ)); - twitterStream.sample(); + this.twitterStream = twitterStream; } @Override @@ -89,46 +107,6 @@ return false; } return true; - } - - private class TweetListener implements StatusListener { - - private LinkedBlockingQueue<String> inputQ; - - public TweetListener(LinkedBlockingQueue<String> inputQ) { - this.inputQ = inputQ; - } - - @Override - public void onStatus(Status tweet) { - String jsonTweet = TwitterObjectFactory.getRawJSON(tweet); - inputQ.add(jsonTweet); - } - - @Override - public void onException(Exception arg0) { - // do nothing - } - - @Override - public void onDeletionNotice(StatusDeletionNotice arg0) { - // do nothing - } - - @Override - public void onScrubGeo(long arg0, long arg1) { - // do nothing - } - - @Override - public void onStallWarning(StallWarning arg0) { - // do nothing - } - - @Override - public void onTrackLimitationNotice(int arg0) { - // do nothing - } } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java index 73d1b39..c2af93f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java @@ -73,39 +73,36 @@ builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET); throw new AsterixException(builder.toString()); } - if (TwitterRecordReaderFactory.isTwitterPull(configuration)) { - pull = true; - if (configuration.get(SearchAPIConstants.QUERY) == null) { - throw new AsterixException( - "parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration"); - } - String interval = configuration.get(SearchAPIConstants.INTERVAL); - if (interval != null) { - try { - Integer.parseInt(interval); - } catch (NumberFormatException nfe) { - throw new IllegalArgumentException( - "parameter " + SearchAPIConstants.INTERVAL + " is defined incorrectly, expecting a number"); - } - } else { - configuration.put(SearchAPIConstants.INTERVAL, DEFAULT_INTERVAL); - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL + " not defined, using default (" - + DEFAULT_INTERVAL + ")"); - } - } - } else { - pull = false; - } - } - public static boolean isTwitterPull(Map<String, String> configuration) { - String reader = configuration.get(ExternalDataConstants.KEY_READER); - if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL) - || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) { - return true; + switch (configuration.get(ExternalDataConstants.KEY_READER)) { + case ExternalDataConstants.READER_PULL_TWITTER: + if (configuration.get(SearchAPIConstants.QUERY) == null) { + throw new AsterixException("parameter " + SearchAPIConstants.QUERY + + " not specified as part of adaptor configuration"); + } + String interval = configuration.get(SearchAPIConstants.INTERVAL); + if (interval != null) { + try { + Integer.parseInt(interval); + } catch (NumberFormatException nfe) { + throw new IllegalArgumentException("parameter " + SearchAPIConstants.INTERVAL + + " is defined incorrectly, expecting a number"); + } + } else { + configuration.put(SearchAPIConstants.INTERVAL, DEFAULT_INTERVAL); + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL + " not defined, using default (" + + DEFAULT_INTERVAL + ")"); + } + } + break; + case ExternalDataConstants.READER_PUSH_TWITTER: + // do nothing. + break; + case ExternalDataConstants.READER_USER_STREAM_TWITTER: + // do nothing. + break; } - return false; } @Override @@ -116,20 +113,34 @@ @Override public IRecordReader<? extends String> createRecordReader(IHyracksTaskContext ctx, int partition) throws HyracksDataException { - if (pull) { - return new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration), - configuration.get(SearchAPIConstants.QUERY), - Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL))); - } else { - FilterQuery query; - try { - query = TwitterUtil.getFilterQuery(configuration); - return (query == null) ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration)) - : new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration), query); - } catch (AsterixException e) { - throw new HyracksDataException(e); - } + IRecordReader<? extends String> recordReader; + switch (configuration.get(ExternalDataConstants.KEY_READER)) { + case ExternalDataConstants.READER_PULL_TWITTER: + recordReader = new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration), + configuration.get(SearchAPIConstants.QUERY), + Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL))); + break; + case ExternalDataConstants.READER_PUSH_TWITTER: + FilterQuery query; + try { + query = TwitterUtil.getFilterQuery(configuration); + recordReader = (query == null) + ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration), + TwitterUtil.getTweetListener()) + : new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration), + TwitterUtil.getTweetListener(), query); + } catch (AsterixException e) { + throw new HyracksDataException(e); + } + break; + case ExternalDataConstants.READER_USER_STREAM_TWITTER: + recordReader = new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration), + TwitterUtil.getUserTweetsListener()); + break; + default: + throw new HyracksDataException("No Record reader found!"); } + return recordReader; } @Override @@ -147,4 +158,5 @@ } return true; } + } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java index ad11171..d3faf84 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java @@ -101,6 +101,7 @@ case ExternalDataConstants.READER_TWITTER_PUSH: case ExternalDataConstants.READER_PUSH_TWITTER: case ExternalDataConstants.READER_PULL_TWITTER: + case ExternalDataConstants.READER_USER_STREAM_TWITTER: return new TwitterRecordReaderFactory(); case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER: return new StreamRecordReaderFactory(new TwitterFirehoseStreamFactory()); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index c5167c1..fa337fa 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -128,6 +128,7 @@ public static final String READER_PUSH_TWITTER = "push_twitter"; public static final String READER_TWITTER_PULL = "twitter_pull"; public static final String READER_PULL_TWITTER = "pull_twitter"; + public static final String READER_USER_STREAM_TWITTER = "twitter_user_stream"; public static final String CLUSTER_LOCATIONS = "cluster-locations"; public static final String SCHEDULER = "hdfs-scheduler"; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java index 70d31c0..dedba40 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java @@ -19,17 +19,27 @@ package org.apache.asterix.external.util; import org.apache.asterix.common.exceptions.AsterixException; +import twitter4j.DirectMessage; import twitter4j.FilterQuery; +import twitter4j.StallWarning; +import twitter4j.Status; +import twitter4j.StatusDeletionNotice; +import twitter4j.StatusListener; import twitter4j.Twitter; import twitter4j.TwitterFactory; +import twitter4j.TwitterObjectFactory; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; +import twitter4j.User; +import twitter4j.UserList; +import twitter4j.UserStreamListener; import twitter4j.conf.ConfigurationBuilder; import java.io.InputStream; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; @@ -275,4 +285,187 @@ public static final String INTERVAL = "interval"; } + public static UserTweetsListener getUserTweetsListener() { + return new UserTweetsListener(); + } + + public static TweetListener getTweetListener() { + return new TweetListener(); + } + + public static class UserTweetsListener implements UserStreamListener { + + private LinkedBlockingQueue<String> inputQ; + + public void setInputQ(LinkedBlockingQueue<String> inputQ) { + this.inputQ = inputQ; + } + + @Override + public void onDeletionNotice(long l, long l1) { + //do nothing + } + + @Override + public void onFriendList(long[] longs) { + //do nothing + } + + @Override + public void onFavorite(User user, User user1, Status status) { + //do nothing + } + + @Override + public void onUnfavorite(User user, User user1, Status status) { + //do nothing + } + + @Override + public void onFollow(User user, User user1) { + //do nothing + } + + @Override + public void onUnfollow(User user, User user1) { + //do nothing + } + + @Override + public void onDirectMessage(DirectMessage directMessage) { + //do nothing + } + + @Override + public void onUserListMemberAddition(User user, User user1, UserList userList) { + //do nothing + } + + @Override + public void onUserListMemberDeletion(User user, User user1, UserList userList) { + //do nothing + } + + @Override + public void onUserListSubscription(User user, User user1, UserList userList) { + //do nothing + } + + @Override + public void onUserListUnsubscription(User user, User user1, UserList userList) { + //do nothing + } + + @Override + public void onUserListCreation(User user, UserList userList) { + //do nothing + } + + @Override + public void onUserListUpdate(User user, UserList userList) { + //do nothing + } + + @Override + public void onUserListDeletion(User user, UserList userList) { + //do nothing + } + + @Override + public void onUserProfileUpdate(User user) { + //do nothing + } + + @Override + public void onUserSuspension(long l) { + //do nothing + } + + @Override + public void onUserDeletion(long l) { + //do nothing + } + + @Override + public void onBlock(User user, User user1) { + //do nothing + } + + @Override + public void onUnblock(User user, User user1) { + //do nothing + } + + @Override + public void onStatus(Status status) { + String jsonTweet = TwitterObjectFactory.getRawJSON(status); + inputQ.add(jsonTweet); + } + + @Override + public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { + //do nothing + } + + @Override + public void onTrackLimitationNotice(int i) { + //do nothing + } + + @Override + public void onScrubGeo(long l, long l1) { + //do nothing + } + + @Override + public void onStallWarning(StallWarning stallWarning) { + //do nothing + } + + @Override + public void onException(Exception e) { + //do nothing + } + } + + public static class TweetListener implements StatusListener { + + private LinkedBlockingQueue<String> inputQ; + + public void setInputQ(LinkedBlockingQueue<String> inputQ) { + this.inputQ = inputQ; + } + + @Override + public void onStatus(Status tweet) { + String jsonTweet = TwitterObjectFactory.getRawJSON(tweet); + inputQ.add(jsonTweet); + } + + @Override + public void onException(Exception arg0) { + // do nothing + } + + @Override + public void onDeletionNotice(StatusDeletionNotice arg0) { + // do nothing + } + + @Override + public void onScrubGeo(long arg0, long arg1) { + // do nothing + } + + @Override + public void onStallWarning(StallWarning arg0) { + // do nothing + } + + @Override + public void onTrackLimitationNotice(int arg0) { + // do nothing + } + } + } -- To view, visit https://asterix-gerrit.ics.uci.edu/1272 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Xikui Wang <xkk...@gmail.com>