Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1272

Change subject: Add user-stream for Twitter Adaptor
......................................................................

Add user-stream for Twitter Adaptor

1. Add user-stream option for Twitter Adaptor
2. Refactor part of TwitterRecordReaderFactory
3. To create a user-stream feed, using following ddl:
  create feed TwitterFeed using twitter_user_stream(
      ("format"="twitter-status"),
      ("type-name"="Tweet"),
      ...
   // rest is same as push feed

Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
5 files changed, 278 insertions(+), 93 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/72/1272/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index 9ead8a9..c296bc6 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -26,6 +26,8 @@
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.TwitterUtil;
+import twitter4j.DirectMessage;
 import twitter4j.FilterQuery;
 import twitter4j.StallWarning;
 import twitter4j.Status;
@@ -33,27 +35,43 @@
 import twitter4j.StatusListener;
 import twitter4j.TwitterObjectFactory;
 import twitter4j.TwitterStream;
+import twitter4j.User;
+import twitter4j.UserList;
+import twitter4j.UserStreamListener;
 
 public class TwitterPushRecordReader implements IRecordReader<String> {
     private LinkedBlockingQueue<String> inputQ;
     private TwitterStream twitterStream;
     private GenericRecord<String> record;
+    private StatusListener tweetListener;
     private boolean closed = false;
 
-    public TwitterPushRecordReader(TwitterStream twitterStream, FilterQuery 
query) {
-        record = new GenericRecord<>();
-        inputQ = new LinkedBlockingQueue<>();
-        this.twitterStream = 
twitterStream;//TwitterUtil.getTwitterStream(configuration);
-        this.twitterStream.addListener(new TweetListener(inputQ));
+    public TwitterPushRecordReader(TwitterStream twitterStream, 
TwitterUtil.TweetListener tweetListener,
+            FilterQuery query) {
+        init(twitterStream);
+        tweetListener.setInputQ(inputQ);
+        this.twitterStream.addListener(tweetListener);
         this.twitterStream.filter(query);
     }
 
-    public TwitterPushRecordReader(TwitterStream twitterStream) {
+    public TwitterPushRecordReader(TwitterStream twitterStream, 
TwitterUtil.TweetListener tweetListener) {
+        init(twitterStream);
+        tweetListener.setInputQ(inputQ);
+        this.twitterStream.addListener(tweetListener);
+        twitterStream.sample();
+    }
+
+    public TwitterPushRecordReader(TwitterStream twitterStream, 
TwitterUtil.UserTweetsListener tweetListener) {
+        init(twitterStream);
+        tweetListener.setInputQ(inputQ);
+        this.twitterStream.addListener(tweetListener);
+        twitterStream.user();
+    }
+
+    private void init(TwitterStream twitterStream) {
         record = new GenericRecord<>();
         inputQ = new LinkedBlockingQueue<>();
-        this.twitterStream = twitterStream;//
-        this.twitterStream.addListener(new TweetListener(inputQ));
-        twitterStream.sample();
+        this.twitterStream = twitterStream;
     }
 
     @Override
@@ -89,46 +107,6 @@
             return false;
         }
         return true;
-    }
-
-    private class TweetListener implements StatusListener {
-
-        private LinkedBlockingQueue<String> inputQ;
-
-        public TweetListener(LinkedBlockingQueue<String> inputQ) {
-            this.inputQ = inputQ;
-        }
-
-        @Override
-        public void onStatus(Status tweet) {
-            String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
-            inputQ.add(jsonTweet);
-        }
-
-        @Override
-        public void onException(Exception arg0) {
-            // do nothing
-        }
-
-        @Override
-        public void onDeletionNotice(StatusDeletionNotice arg0) {
-            // do nothing
-        }
-
-        @Override
-        public void onScrubGeo(long arg0, long arg1) {
-            // do nothing
-        }
-
-        @Override
-        public void onStallWarning(StallWarning arg0) {
-            // do nothing
-        }
-
-        @Override
-        public void onTrackLimitationNotice(int arg0) {
-            // do nothing
-        }
     }
 
     @Override
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 73d1b39..c2af93f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -73,39 +73,36 @@
             builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
             throw new AsterixException(builder.toString());
         }
-        if (TwitterRecordReaderFactory.isTwitterPull(configuration)) {
-            pull = true;
-            if (configuration.get(SearchAPIConstants.QUERY) == null) {
-                throw new AsterixException(
-                        "parameter " + SearchAPIConstants.QUERY + " not 
specified as part of adaptor configuration");
-            }
-            String interval = configuration.get(SearchAPIConstants.INTERVAL);
-            if (interval != null) {
-                try {
-                    Integer.parseInt(interval);
-                } catch (NumberFormatException nfe) {
-                    throw new IllegalArgumentException(
-                            "parameter " + SearchAPIConstants.INTERVAL + " is 
defined incorrectly, expecting a number");
-                }
-            } else {
-                configuration.put(SearchAPIConstants.INTERVAL, 
DEFAULT_INTERVAL);
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL 
+ " not defined, using default ("
-                            + DEFAULT_INTERVAL + ")");
-                }
-            }
-        } else {
-            pull = false;
-        }
-    }
 
-    public static boolean isTwitterPull(Map<String, String> configuration) {
-        String reader = configuration.get(ExternalDataConstants.KEY_READER);
-        if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL)
-                || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) {
-            return true;
+        switch (configuration.get(ExternalDataConstants.KEY_READER)) {
+            case ExternalDataConstants.READER_PULL_TWITTER:
+                if (configuration.get(SearchAPIConstants.QUERY) == null) {
+                    throw new AsterixException("parameter " + 
SearchAPIConstants.QUERY
+                            + " not specified as part of adaptor 
configuration");
+                }
+                String interval = 
configuration.get(SearchAPIConstants.INTERVAL);
+                if (interval != null) {
+                    try {
+                        Integer.parseInt(interval);
+                    } catch (NumberFormatException nfe) {
+                        throw new IllegalArgumentException("parameter " + 
SearchAPIConstants.INTERVAL
+                                + " is defined incorrectly, expecting a 
number");
+                    }
+                } else {
+                    configuration.put(SearchAPIConstants.INTERVAL, 
DEFAULT_INTERVAL);
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning(" Parameter " + 
SearchAPIConstants.INTERVAL + " not defined, using default ("
+                                + DEFAULT_INTERVAL + ")");
+                    }
+                }
+                break;
+            case ExternalDataConstants.READER_PUSH_TWITTER:
+                // do nothing.
+                break;
+            case ExternalDataConstants.READER_USER_STREAM_TWITTER:
+                // do nothing.
+                break;
         }
-        return false;
     }
 
     @Override
@@ -116,20 +113,34 @@
     @Override
     public IRecordReader<? extends String> 
createRecordReader(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
-        if (pull) {
-            return new 
TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
-                    configuration.get(SearchAPIConstants.QUERY),
-                    
Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
-        } else {
-            FilterQuery query;
-            try {
-                query = TwitterUtil.getFilterQuery(configuration);
-                return (query == null) ? new 
TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration))
-                        : new 
TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration), query);
-            } catch (AsterixException e) {
-                throw new HyracksDataException(e);
-            }
+        IRecordReader<? extends String> recordReader;
+        switch (configuration.get(ExternalDataConstants.KEY_READER)) {
+            case ExternalDataConstants.READER_PULL_TWITTER:
+                recordReader = new 
TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
+                        configuration.get(SearchAPIConstants.QUERY),
+                        
Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
+                break;
+            case ExternalDataConstants.READER_PUSH_TWITTER:
+                FilterQuery query;
+                try {
+                    query = TwitterUtil.getFilterQuery(configuration);
+                    recordReader = (query == null)
+                            ? new 
TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+                                    TwitterUtil.getTweetListener())
+                            : new 
TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+                                    TwitterUtil.getTweetListener(), query);
+                } catch (AsterixException e) {
+                    throw new HyracksDataException(e);
+                }
+                break;
+            case ExternalDataConstants.READER_USER_STREAM_TWITTER:
+                recordReader = new 
TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+                        TwitterUtil.getUserTweetsListener());
+                break;
+            default:
+                throw new HyracksDataException("No Record reader found!");
         }
+        return recordReader;
     }
 
     @Override
@@ -147,4 +158,5 @@
         }
         return true;
     }
+
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index ad11171..d3faf84 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -101,6 +101,7 @@
             case ExternalDataConstants.READER_TWITTER_PUSH:
             case ExternalDataConstants.READER_PUSH_TWITTER:
             case ExternalDataConstants.READER_PULL_TWITTER:
+            case ExternalDataConstants.READER_USER_STREAM_TWITTER:
                 return new TwitterRecordReaderFactory();
             case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
                 return new StreamRecordReaderFactory(new 
TwitterFirehoseStreamFactory());
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index c5167c1..fa337fa 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -128,6 +128,7 @@
     public static final String READER_PUSH_TWITTER = "push_twitter";
     public static final String READER_TWITTER_PULL = "twitter_pull";
     public static final String READER_PULL_TWITTER = "pull_twitter";
+    public static final String READER_USER_STREAM_TWITTER = 
"twitter_user_stream";
 
     public static final String CLUSTER_LOCATIONS = "cluster-locations";
     public static final String SCHEDULER = "hdfs-scheduler";
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
index 70d31c0..dedba40 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
@@ -19,17 +19,27 @@
 package org.apache.asterix.external.util;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import twitter4j.DirectMessage;
 import twitter4j.FilterQuery;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
 import twitter4j.Twitter;
 import twitter4j.TwitterFactory;
+import twitter4j.TwitterObjectFactory;
 import twitter4j.TwitterStream;
 import twitter4j.TwitterStreamFactory;
+import twitter4j.User;
+import twitter4j.UserList;
+import twitter4j.UserStreamListener;
 import twitter4j.conf.ConfigurationBuilder;
 
 import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.regex.Matcher;
@@ -275,4 +285,187 @@
         public static final String INTERVAL = "interval";
     }
 
+    public static UserTweetsListener getUserTweetsListener() {
+        return new UserTweetsListener();
+    }
+
+    public static TweetListener getTweetListener() {
+        return new TweetListener();
+    }
+
+    public static class UserTweetsListener implements UserStreamListener {
+
+        private LinkedBlockingQueue<String> inputQ;
+
+        public void setInputQ(LinkedBlockingQueue<String> inputQ) {
+            this.inputQ = inputQ;
+        }
+
+        @Override
+        public void onDeletionNotice(long l, long l1) {
+            //do nothing
+        }
+
+        @Override
+        public void onFriendList(long[] longs) {
+            //do nothing
+        }
+
+        @Override
+        public void onFavorite(User user, User user1, Status status) {
+            //do nothing
+        }
+
+        @Override
+        public void onUnfavorite(User user, User user1, Status status) {
+            //do nothing
+        }
+
+        @Override
+        public void onFollow(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onUnfollow(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onDirectMessage(DirectMessage directMessage) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListMemberAddition(User user, User user1, UserList 
userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListMemberDeletion(User user, User user1, UserList 
userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListSubscription(User user, User user1, UserList 
userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListUnsubscription(User user, User user1, UserList 
userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListCreation(User user, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListUpdate(User user, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListDeletion(User user, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserProfileUpdate(User user) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserSuspension(long l) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserDeletion(long l) {
+            //do nothing
+        }
+
+        @Override
+        public void onBlock(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onUnblock(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onStatus(Status status) {
+            String jsonTweet = TwitterObjectFactory.getRawJSON(status);
+            inputQ.add(jsonTweet);
+        }
+
+        @Override
+        public void onDeletionNotice(StatusDeletionNotice 
statusDeletionNotice) {
+            //do nothing
+        }
+
+        @Override
+        public void onTrackLimitationNotice(int i) {
+            //do nothing
+        }
+
+        @Override
+        public void onScrubGeo(long l, long l1) {
+            //do nothing
+        }
+
+        @Override
+        public void onStallWarning(StallWarning stallWarning) {
+            //do nothing
+        }
+
+        @Override
+        public void onException(Exception e) {
+            //do nothing
+        }
+    }
+
+    public static class TweetListener implements StatusListener {
+
+        private LinkedBlockingQueue<String> inputQ;
+
+        public void setInputQ(LinkedBlockingQueue<String> inputQ) {
+            this.inputQ = inputQ;
+        }
+
+        @Override
+        public void onStatus(Status tweet) {
+            String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
+            inputQ.add(jsonTweet);
+        }
+
+        @Override
+        public void onException(Exception arg0) {
+            // do nothing
+        }
+
+        @Override
+        public void onDeletionNotice(StatusDeletionNotice arg0) {
+            // do nothing
+        }
+
+        @Override
+        public void onScrubGeo(long arg0, long arg1) {
+            // do nothing
+        }
+
+        @Override
+        public void onStallWarning(StallWarning arg0) {
+            // do nothing
+        }
+
+        @Override
+        public void onTrackLimitationNotice(int arg0) {
+            // do nothing
+        }
+    }
+
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xkk...@gmail.com>

Reply via email to