Repository: incubator-streams
Updated Branches:
  refs/heads/master 8bb4ca8a6 -> 4febde277


related to STREAMS-403


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9bf8ef9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9bf8ef9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9bf8ef9b

Branch: refs/heads/master
Commit: 9bf8ef9ba566351a855366875f0253059c0473ed
Parents: 8bb4ca8
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Authored: Tue Oct 4 15:06:14 2016 -0500
Committer: Steve Blackmon @steveblackmon <sblack...@apache.org>
Committed: Tue Oct 4 15:11:44 2016 -0500

----------------------------------------------------------------------
 .../twitter/provider/TwitterErrorHandler.java   |  28 +++--
 .../provider/TwitterFollowingProvider.java      |   9 +-
 .../provider/TwitterFollowingProviderTask.java  |   8 +-
 .../provider/TwitterTimelineProvider.java       |   8 +-
 .../provider/TwitterTimelineProviderTask.java   |  20 +++-
 .../TwitterUserInformationProvider.java         | 103 ++++++++++++++-----
 .../src/main/jsonschema/com/twitter/Follow.json |   1 +
 .../com/twitter/TwitterConfiguration.json       |  10 ++
 8 files changed, 142 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
index 51236ba..90f6b62 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
@@ -18,6 +18,9 @@
 
 package org.apache.streams.twitter.provider;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.twitter.TwitterConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import twitter4j.Twitter;
@@ -32,11 +35,18 @@ public class TwitterErrorHandler
     private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterErrorHandler.class);
 
     // selected because 3 * 5 + n >= 15 for positive n
-    protected static final long retry = 3*60*1000;
+    protected static long retry =
+            new 
ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
+                    StreamsConfigurator.getConfig().getConfig("twitter")
+            ).getRetrySleepMs();
+    protected static long retryMax =
+            new 
ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
+                    StreamsConfigurator.getConfig().getConfig("twitter")
+            ).getRetryMax();
 
     @Deprecated
     public static int handleTwitterError(Twitter twitter, Exception exception) 
{
-        return handleTwitterError(twitter, null, exception);
+        return handleTwitterError( twitter, null, exception);
     }
 
     public static int handleTwitterError(Twitter twitter, Long id, Exception 
exception)
@@ -82,11 +92,11 @@ public class TwitterErrorHandler
                         LOGGER.warn("User does not exist: {}", id);
                     else
                         LOGGER.warn("User does not exist");
-                    return 100;
+                    return (int)retryMax;
                 }
                 else
                 {
-                    return 1;
+                    return (int)retryMax/3;
                 }
             }
             else
@@ -94,7 +104,7 @@ public class TwitterErrorHandler
                 if(e.getExceptionCode().equals("ced778ef-0c669ac0"))
                 {
                     // This is a known weird issue, not exactly sure the 
cause, but you'll never be able to get the data.
-                    return 5;
+                    return (int)retryMax/3;
                 }
                 else if(e.getExceptionCode().equals("4be80492-0a7bf7c7")) {
                     // This is a 401 reflecting credentials don't have access 
to the requested resource.
@@ -102,7 +112,7 @@ public class TwitterErrorHandler
                         LOGGER.warn("Authentication Exception accessing id: 
{}", id);
                     else
                         LOGGER.warn("Authentication Exception");
-                    return 5;
+                    return (int)retryMax;
                 }
                 else
                 {
@@ -111,19 +121,19 @@ public class TwitterErrorHandler
                     LOGGER.warn("   Access: {}", e.getAccessLevel());
                     LOGGER.warn("     Code: {}", e.getExceptionCode());
                     LOGGER.warn("  Message: {}", e.getLocalizedMessage());
-                    return 1;
+                    return (int)retryMax/10;
                 }
             }
         }
         else if(exception instanceof RuntimeException)
         {
             LOGGER.warn("TwitterGrabber: Unknown Runtime Error", 
exception.getMessage());
-            return 1;
+            return (int)retryMax/3;
         }
         else
         {
             LOGGER.info("Completely Unknown Exception: {}", exception);
-            return 1;
+            return (int)retryMax/3;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index dc15407..27c8526 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import twitter4j.Twitter;
 
 import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -57,6 +58,7 @@ public class TwitterFollowingProvider extends 
TwitterUserInformationProvider {
     }
 
     public TwitterFollowingProvider(TwitterFollowingConfiguration config) {
+        super(config);
         this.config = config;
     }
 
@@ -130,7 +132,7 @@ public class TwitterFollowingProvider extends 
TwitterUserInformationProvider {
     }
 
     protected Queue<StreamsDatum> constructQueue() {
-        return Queues.synchronizedQueue(new 
LinkedBlockingQueue<StreamsDatum>(MAX_NUMBER_WAITING));
+        return new ConcurrentLinkedQueue<StreamsDatum>();
     }
 
     @Override
@@ -149,4 +151,9 @@ public class TwitterFollowingProvider extends 
TwitterUserInformationProvider {
             lock.readLock().unlock();
         }
     }
+
+    @Override
+    public boolean isRunning() {
+        return running.get();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
index 5397757..cc71d48 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
@@ -139,7 +139,7 @@ public class TwitterFollowingProviderTask implements 
Runnable {
                         Preconditions.checkNotNull(follow);
 
                         if( count < provider.getConfig().getMaxItems()) {
-                            provider.addDatum(new StreamsDatum(follow));
+                            ComponentUtils.offerUntilSuccess(new 
StreamsDatum(follow), provider.providerQueue);
                             count++;
                         }
 
@@ -157,7 +157,7 @@ public class TwitterFollowingProviderTask implements 
Runnable {
             catch(Exception e) {
                 keepTrying += TwitterErrorHandler.handleTwitterError(client, 
e);
             }
-        } while (curser != 0 && keepTrying < 10 && count < 
provider.getConfig().getMaxItems());
+        } while (curser != 0 && keepTrying < 
provider.getConfig().getRetryMax() && count < 
provider.getConfig().getMaxItems());
     }
 
     private void collectIds(Long id) {
@@ -196,7 +196,7 @@ public class TwitterFollowingProviderTask implements 
Runnable {
                         Preconditions.checkNotNull(follow);
 
                         if( count < provider.getConfig().getMaxItems()) {
-                            provider.addDatum(new StreamsDatum(follow));
+                            ComponentUtils.offerUntilSuccess(new 
StreamsDatum(follow), provider.providerQueue);
                             count++;
                         }
                     } catch (Exception e) {
@@ -213,7 +213,7 @@ public class TwitterFollowingProviderTask implements 
Runnable {
             catch(Exception e) {
                 keepTrying += TwitterErrorHandler.handleTwitterError(client, 
e);
             }
-        } while (curser != 0 && keepTrying < 10 && count < 
provider.getConfig().getMaxItems());
+        } while (curser != 0 && keepTrying < 
provider.getConfig().getRetryMax() && count < 
provider.getConfig().getMaxItems());
     }
 
     protected void getFollowing(String screenName) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 26ba887..a8eada4 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -109,7 +109,7 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
 
         Preconditions.checkArgument(!ids.isEmpty());
 
-        LOGGER.info("readCurrent");
+        LOGGER.debug("{} - readCurrent", ids);
 
         submitTimelineThreads(ids.toArray(new Long[0]));
 
@@ -150,10 +150,10 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
 
     public StreamsResultSet readCurrent() {
 
-        LOGGER.info("Providing {} docs", providerQueue.size());
-
         StreamsResultSet result;
 
+        LOGGER.info("Providing {} docs", providerQueue.size());
+
         try {
             lock.writeLock().lock();
             result = new StreamsResultSet(providerQueue);
@@ -176,7 +176,7 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
     }
 
     protected Queue<StreamsDatum> constructQueue() {
-        return Queues.synchronizedQueue(new 
LinkedBlockingQueue<StreamsDatum>(MAX_NUMBER_WAITING));
+        return new LinkedBlockingQueue<StreamsDatum>();
     }
 
     public StreamsResultSet readNew(BigInteger sequence) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index adc37ca..b8d5e1d 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -18,10 +18,17 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.*;
+import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import twitter4j.*;
+import twitter4j.Status;
 
 import java.util.List;
 
@@ -32,6 +39,8 @@ public class TwitterTimelineProviderTask implements Runnable {
 
     private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterTimelineProviderTask.class);
 
+    private static ObjectMapper MAPPER = new 
StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
     protected TwitterTimelineProvider provider;
     protected Twitter client;
     protected Long id;
@@ -49,6 +58,8 @@ public class TwitterTimelineProviderTask implements Runnable {
         List<Status> statuses = null;
         int count = 0;
 
+        LOGGER.info(id + " Thread Starting");
+
         do
         {
             int keepTrying = 0;
@@ -67,10 +78,15 @@ public class TwitterTimelineProviderTask implements 
Runnable {
                     statuses = client.getUserTimeline(id, paging);
 
                     for (Status tStat : statuses) {
-                        String json = TwitterObjectFactory.getRawJSON(tStat);
 
+                        String json = TwitterObjectFactory.getRawJSON(tStat);
                         if( count < provider.getConfig().getMaxItems() ) {
-                            provider.addDatum(new StreamsDatum(json));
+                            try {
+                                org.apache.streams.twitter.pojo.Tweet tweet = 
MAPPER.readValue(json, org.apache.streams.twitter.pojo.Tweet.class);
+                                ComponentUtils.offerUntilSuccess(new 
StreamsDatum(tweet), provider.providerQueue);
+                            } catch(Exception exception) {
+                                LOGGER.warn("Failed to read document as Tweet 
", tStat);
+                            }
                             count++;
                         }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index c4cc96d..78eb3e6 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -18,17 +18,24 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.TwitterFollowingConfiguration;
 import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.User;
 import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -36,7 +43,6 @@ import org.slf4j.LoggerFactory;
 import twitter4j.Twitter;
 import twitter4j.TwitterException;
 import twitter4j.TwitterFactory;
-import twitter4j.User;
 import twitter4j.conf.ConfigurationBuilder;
 import twitter4j.json.DataObjectFactory;
 
@@ -48,17 +54,27 @@ import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 
 public class TwitterUserInformationProvider implements StreamsProvider, 
Serializable
 {
 
     public static final String STREAMS_ID = "TwitterUserInformationProvider";
 
+    private static ObjectMapper MAPPER = new 
StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterUserInformationProvider.class);
 
+    public static final int MAX_NUMBER_WAITING = 1000;
+
     private TwitterUserInformationConfiguration config;
 
-    protected volatile Queue<StreamsDatum> providerQueue = new 
LinkedBlockingQueue<StreamsDatum>();
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    protected volatile Queue<StreamsDatum> providerQueue;
 
     public TwitterUserInformationConfiguration getConfig()              { 
return config; }
 
@@ -81,7 +97,7 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
     }
 
     public TwitterUserInformationProvider() {
-        this.config = new 
ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter"));
+        this.config = new 
ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter"));
     }
 
     public TwitterUserInformationProvider(TwitterUserInformationConfiguration 
config) {
@@ -99,7 +115,20 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
 
     @Override
     public void startStream() {
+
+        Preconditions.checkArgument(idsBatches.hasNext() || 
screenNameBatches.hasNext());
+
+        LOGGER.info("{}{} - startStream", idsBatches, screenNameBatches);
+
+        while(idsBatches.hasNext())
+            loadBatch(idsBatches.next());
+
+        while(screenNameBatches.hasNext())
+            loadBatch(screenNameBatches.next());
+
         running.set(true);
+
+        executor.shutdown();
     }
 
     protected void loadBatch(Long[] ids) {
@@ -116,9 +145,14 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
                 for(int i = 0; i < ids.length; i++)
                     toQuery[i] = ids[i];
 
-                for (User tStat : client.lookupUsers(toQuery)) {
-                    String json = DataObjectFactory.getRawJSON(tStat);
-                    ComponentUtils.offerUntilSuccess(new StreamsDatum(json), 
providerQueue);
+                for (twitter4j.User tUser : client.lookupUsers(toQuery)) {
+                    String json = DataObjectFactory.getRawJSON(tUser);
+                    try {
+                        User user = MAPPER.readValue(json, 
org.apache.streams.twitter.pojo.User.class);
+                        ComponentUtils.offerUntilSuccess(new 
StreamsDatum(user), providerQueue);
+                    } catch(Exception exception) {
+                        LOGGER.warn("Failed to read document as User ", tUser);
+                    }
                 }
                 keepTrying = 10;
             }
@@ -141,9 +175,14 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
         {
             try
             {
-                for (User tStat : client.lookupUsers(ids)) {
-                    String json = DataObjectFactory.getRawJSON(tStat);
-                    providerQueue.offer(new StreamsDatum(json));
+                for (twitter4j.User tUser : client.lookupUsers(ids)) {
+                    String json = DataObjectFactory.getRawJSON(tUser);
+                    try {
+                        User user = MAPPER.readValue(json, 
org.apache.streams.twitter.pojo.User.class);
+                        ComponentUtils.offerUntilSuccess(new 
StreamsDatum(user), providerQueue);
+                    } catch(Exception exception) {
+                        LOGGER.warn("Failed to read document as User ", tUser);
+                    }
                 }
                 keepTrying = 10;
             }
@@ -158,30 +197,34 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
 
     public StreamsResultSet readCurrent() {
 
-        Preconditions.checkArgument(idsBatches.hasNext() || 
screenNameBatches.hasNext());
-
-        LOGGER.info("readCurrent");
-
-        while(idsBatches.hasNext())
-            loadBatch(idsBatches.next());
-
-        while(screenNameBatches.hasNext())
-            loadBatch(screenNameBatches.next());
-
+        LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
 
-        LOGGER.info("Finished.  Cleaning up...");
+        StreamsResultSet result;
 
-        LOGGER.info("Providing {} docs", providerQueue.size());
+        try {
+            lock.writeLock().lock();
+            result = new StreamsResultSet(providerQueue);
+            result.setCounter(new DatumStatusCounter());
+            providerQueue = constructQueue();
+            LOGGER.info("{}{} - providing {} docs", idsBatches, 
screenNameBatches, result.size());
+        } finally {
+            lock.writeLock().unlock();
+        }
 
-        StreamsResultSet result =  new StreamsResultSet(providerQueue);
-        running.set(false);
+        if( providerQueue.isEmpty() && executor.isTerminated()) {
+            LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
 
-        LOGGER.info("Exiting");
+            running.set(false);
+        }
 
         return result;
 
     }
 
+    protected Queue<StreamsDatum> constructQueue() {
+        return new LinkedBlockingQueue<StreamsDatum>();
+    }
+
     public StreamsResultSet readNew(BigInteger sequence) {
         LOGGER.debug("{} readNew", STREAMS_ID);
         throw new NotImplementedException();
@@ -225,6 +268,13 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
         if( o instanceof TwitterFollowingConfiguration )
             config = (TwitterUserInformationConfiguration) o;
 
+        try {
+            lock.writeLock().lock();
+            providerQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
+        }
+
         Preconditions.checkNotNull(providerQueue);
         Preconditions.checkNotNull(config.getOauth().getConsumerKey());
         Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
@@ -276,7 +326,10 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
         if(screenNames.size() > 0)
             screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
 
-        executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() 
+ screenNames.size())));
+        if(ids.size() + screenNames.size() > 0)
+            executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() 
+ screenNames.size())));
+        else
+            executor = 
MoreExecutors.listeningDecorator(newSingleThreadExecutor());
 
         this.idsBatches = idsBatches.iterator();
         this.screenNameBatches = screenNameBatches.iterator();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
 
b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
index b667540..320db12 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
+++ 
b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
@@ -6,6 +6,7 @@
     ],
     "id": "#",
     "javaType" : "org.apache.streams.twitter.pojo.Follow",
+    "javaInterfaces": ["java.io.Serializable"],
     "properties": {
         "follower": {
             "$ref": "User.json"

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
 
b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
index 5d911af..69048d1 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
+++ 
b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
@@ -72,6 +72,16 @@
                     "type": "string"
                 }
             }
+        },
+        "retrySleepMs": {
+             "type": "integer",
+             "description": "ms to sleep when hitting a rate limit",
+             "default": 100000
+         },
+         "retryMax": {
+             "type": "integer",
+             "description": "ms to sleep when hitting a rate limit",
+             "default": 10
         }
    }
 }
\ No newline at end of file

Reply via email to