STREAMS 121 | Added instagram provider and tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/77603934 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/77603934 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/77603934 Branch: refs/heads/STREAMS-46 Commit: 77603934e41ac78ba68f73b3f80ea6852f0acb46 Parents: 815ce2a Author: rebanks <[email protected]> Authored: Fri Jul 11 16:09:59 2014 -0500 Committer: rebanks <[email protected]> Committed: Fri Jul 11 16:09:59 2014 -0500 ---------------------------------------------------------------------- streams-contrib/pom.xml | 1 + .../streams-provider-instagram/pom.xml | 6 + .../instagram/InstagramConfigurator.java | 15 +- .../provider/InstagramRecentMediaCollector.java | 120 ++++++ .../provider/InstagramRecentMediaProvider.java | 97 +++++ .../provider/InstagramTimelineProvider.java | 409 ------------------- .../com/instagram/InstagramConfiguration.json | 13 +- .../InstagramUserInformationConfiguration.json | 2 +- .../InstagramRecentMediaCollectorTest.java | 161 ++++++++ .../InstagramRecentMediaProviderTest.java | 145 +++++++ .../test/InstagramActivitySerDeTest.java | 92 ----- 11 files changed, 544 insertions(+), 517 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index 620f68e..b06c276 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -58,6 +58,7 @@ <module>streams-provider-sysomos</module> <module>streams-provider-rss</module> <module>streams-processor-regex</module> + <module>streams-provider-instagram</module> </modules> <dependencyManagement> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/pom.xml b/streams-contrib/streams-provider-instagram/pom.xml index e356a7a..a96ff1c 100644 --- a/streams-contrib/streams-provider-instagram/pom.xml +++ b/streams-contrib/streams-provider-instagram/pom.xml @@ -76,6 +76,12 @@ <version>1.3</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.9.5</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java index f771856..4d01605 100644 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java @@ -25,9 +25,6 @@ import com.typesafe.config.ConfigRenderOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.validation.Validation; -import javax.validation.Validator; -import javax.validation.ValidatorFactory; import java.io.IOException; /** @@ -41,8 +38,8 @@ public class InstagramConfigurator { public static InstagramConfiguration detectInstagramConfiguration(Config config) { - ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); - Validator validator = factory.getValidator(); +// ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); +// Validator validator = factory.getValidator(); InstagramConfiguration instagramConfiguration = null; try { @@ -52,15 +49,15 @@ public class InstagramConfigurator { } Preconditions.checkNotNull(instagramConfiguration); - Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0); +// Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0); return instagramConfiguration; } public static InstagramUserInformationConfiguration detectInstagramUserInformationConfiguration(Config config) { - ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); - Validator validator = factory.getValidator(); +// ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); +// Validator validator = factory.getValidator(); InstagramUserInformationConfiguration instagramConfiguration = null; try { @@ -70,7 +67,7 @@ public class InstagramConfigurator { } Preconditions.checkNotNull(instagramConfiguration); - Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0); +// Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0); return instagramConfiguration; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java new file mode 100644 index 0000000..7eb3fcd --- /dev/null +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java @@ -0,0 +1,120 @@ +package org.apache.streams.instagram.provider; + +import com.google.common.collect.Sets; +import org.apache.streams.instagram.InstagramUserInformationConfiguration; +import org.jinstagram.Instagram; +import org.jinstagram.entity.users.feed.MediaFeed; +import org.jinstagram.entity.users.feed.MediaFeedData; +import org.jinstagram.exceptions.InstagramException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Queue; +import java.util.Set; + +/** + * + */ +public class InstagramRecentMediaCollector implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollector.class); + protected static final int MAX_ATTEMPTS = 5; + protected static final int SLEEP_SECS = 5; //5 seconds + + protected Queue dataQueue; //exposed for testing + private InstagramUserInformationConfiguration config; + private Instagram instagramClient; + private volatile boolean isCompleted; + + + public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramUserInformationConfiguration config) { + this.dataQueue = queue; + this.config = config; + this.instagramClient = new Instagram(this.config.getClientId()); + this.isCompleted = false; + } + + protected void setInstagramClient(Instagram instagramClient) { + this.instagramClient = instagramClient; + } + + protected Set<Long> getUserIds() { + Set<Long> userIds = Sets.newHashSet(); + for(String id : config.getUserIds()) { + try { + userIds.add(Long.parseLong(id)); + } catch (NumberFormatException nfe) { + LOGGER.error("Failed to parse user id, {}, to a long : {}", id, nfe.getMessage()); + } + } + return userIds; + } + + protected void handleInstagramException(InstagramException instaExec, int attempt) throws InstagramException { + LOGGER.debug("RemainingApiLimitStatus: {}", instaExec.getRemainingLimitStatus()); + if(instaExec.getRemainingLimitStatus() == 0) { //rate limit exception + long sleepTime = Math.round(Math.pow(SLEEP_SECS, attempt)) * 1000; + try { + LOGGER.debug("Encountered rate limit exception, sleeping for {} ms", sleepTime); + Thread.sleep(sleepTime); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } else { + LOGGER.error("Instagram returned an excetpion to the user media request : {}", instaExec.getMessage()); + throw instaExec; + } + } + + private void getUserMedia(Long userId) { + MediaFeed feed = null; + int attempts = 0; + int count = 0; + do { + ++attempts; + try { + feed = this.instagramClient.getRecentMediaFeed(userId); + queueData(feed, userId); + count += feed.getData().size(); + while(feed != null && feed.getPagination() != null && feed.getPagination().hasNextPage()) { + feed = this.instagramClient.getRecentMediaNextPage(feed.getPagination()); + queueData(feed, userId); + count += feed.getData().size(); + } + } catch (InstagramException ie) { + try { + handleInstagramException(ie, attempts); + } catch (InstagramException ie2) { //not a rate limit exception, ignore user + attempts = MAX_ATTEMPTS; + } + } + } while(feed == null && attempts < MAX_ATTEMPTS); + LOGGER.debug("For user, {}, received {} MediaFeedData", userId, count); + } + + private void queueData(MediaFeed userFeed, Long userId) { + if(userFeed == null) { + LOGGER.error("User id, {}, returned a NULL media feed from instagram.", userId); + } else { + for(MediaFeedData data : userFeed.getData()) { + synchronized (this.dataQueue) { + while(!this.dataQueue.offer(data)) { + Thread.yield(); + } + } + } + } + } + + public boolean isCompleted() { + return this.isCompleted; + } + + @Override + public void run() { + for(Long userId : getUserIds()) { + getUserMedia(userId); + } + this.isCompleted = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java new file mode 100644 index 0000000..3354e54 --- /dev/null +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java @@ -0,0 +1,97 @@ +package org.apache.streams.instagram.provider; + +import com.google.common.collect.Queues; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.instagram.InstagramConfigurator; +import org.apache.streams.instagram.InstagramUserInformationConfiguration; +import org.jinstagram.entity.users.feed.MediaFeedData; +import org.joda.time.DateTime; + +import java.math.BigInteger; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Created by rebanks on 7/9/14. + */ +public class InstagramRecentMediaProvider implements StreamsProvider { + + private InstagramUserInformationConfiguration config; + private InstagramRecentMediaCollector dataCollector; + protected Queue<MediaFeedData> mediaFeedQueue; //exposed for testing + private ExecutorService executorService; + private volatile boolean isCompleted; + + public InstagramRecentMediaProvider() { + this(InstagramConfigurator.detectInstagramUserInformationConfiguration(StreamsConfigurator.config.getConfig("instagram"))); + } + + public InstagramRecentMediaProvider(InstagramUserInformationConfiguration config) { + this.config = config; + this.mediaFeedQueue = Queues.newConcurrentLinkedQueue(); + this.isCompleted = false; + } + + @Override + public void startStream() { + this.dataCollector = getInstagramRecentMediaCollector(); + this.executorService = Executors.newSingleThreadExecutor(); + this.executorService.submit(this.dataCollector); + } + + protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() { + return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config); + } + + + @Override + public StreamsResultSet readCurrent() { + Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue(); + MediaFeedData data = null; + synchronized (this.mediaFeedQueue) { + while(!this.mediaFeedQueue.isEmpty()) { + data = this.mediaFeedQueue.poll(); + batch.add(new StreamsDatum(data, data.getId())); + } + } + this.isCompleted = batch.size() == 0 && this.mediaFeedQueue.isEmpty() && this.dataCollector.isCompleted(); + return new StreamsResultSet(batch); + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @Override + public boolean isRunning() { + return !this.isCompleted; + } + + @Override + public void prepare(Object configurationObject) { + + } + + @Override + public void cleanUp() { + this.executorService.shutdown(); + try { + this.executorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } finally { + this.executorService = null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java deleted file mode 100644 index d3e7179..0000000 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java +++ /dev/null @@ -1,409 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.instagram.provider; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Queues; -import com.typesafe.config.Config; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.DatumStatusCounter; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.instagram.InstagramConfigurator; -import org.apache.streams.instagram.InstagramUserInformationConfiguration; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.jinstagram.Instagram; -import org.jinstagram.entity.users.feed.MediaFeedData; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; - -import java.io.Serializable; -import java.math.BigInteger; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -public class InstagramTimelineProvider implements StreamsProvider, Serializable { - - public final static String STREAMS_ID = "InstagramTimelineProvider"; - - private final static Logger LOGGER = LoggerFactory.getLogger(InstagramTimelineProvider.class); - public static final int MAX_NUMBER_WAITING = 10000; - - private static StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance(); - private InstagramUserInformationConfiguration config; - - private Class klass; - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - - public InstagramUserInformationConfiguration getConfig() { - return config; - } - - public void setConfig(InstagramUserInformationConfiguration config) { - this.config = config; - } - - protected Iterator<Long[]> idsBatches; - protected Iterator<String[]> screenNameBatches; - - protected volatile Queue<StreamsDatum> providerQueue; - - protected int idsCount; - protected Instagram client; - - - protected ExecutorService executor; - - protected DateTime start; - protected DateTime end; - - protected final AtomicBoolean running = new AtomicBoolean(); - - private static ExecutorService getExecutor() { - return Executors.newSingleThreadExecutor(); - } - - public InstagramTimelineProvider() { - Config config = StreamsConfigurator.config.getConfig("instagram"); - this.config = InstagramConfigurator.detectInstagramUserInformationConfiguration(config); - } - - public InstagramTimelineProvider(InstagramUserInformationConfiguration config) { - this.config = config; - } - - public InstagramTimelineProvider(Class klass) { - Config config = StreamsConfigurator.config.getConfig("instagram"); - this.config = InstagramConfigurator.detectInstagramUserInformationConfiguration(config); - this.klass = klass; - } - - public InstagramTimelineProvider(InstagramUserInformationConfiguration config, Class klass) { - this.config = config; - this.klass = klass; - } - - - public Queue<StreamsDatum> getProviderQueue() { - return this.providerQueue; - } - - @Override - public void startStream() { - LOGGER.debug("{} startStream", STREAMS_ID); - - Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext()); - - LOGGER.info("readCurrent"); - - while(idsBatches.hasNext()) - loadBatch(idsBatches.next()); - - while(screenNameBatches.hasNext()) - loadBatch(screenNameBatches.next()); - - executor.shutdown(); - } - - private void loadBatch(Long[] ids) { - - // twitter4j implementation below - replace with jInstagram - -// Twitter client = getTwitterClient(); -// int keepTrying = 0; -// -// // keep trying to load, give it 5 attempts. -// //while (keepTrying < 10) -// while (keepTrying < 1) -// { -// try -// { -// long[] toQuery = new long[ids.length]; -// for(int i = 0; i < ids.length; i++) -// toQuery[i] = ids[i]; -// -// for (User tStat : client.lookupUsers(toQuery)) { -// -// TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId()); -// executor.submit(providerTask); -// -// } -// keepTrying = 10; -// } -// catch(TwitterException twitterException) { -// keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); -// } -// catch(Exception e) { -// keepTrying += TwitterErrorHandler.handleTwitterError(client, e); -// } -// } - } - - private void loadBatch(String[] ids) { - - // twitter4j implementation below - replace with jInstagram -// -// Twitter client = getTwitterClient(); -// int keepTrying = 0; -// -// // keep trying to load, give it 5 attempts. -// //while (keepTrying < 10) -// while (keepTrying < 1) -// { -// try -// { -// for (User tStat : client.lookupUsers(ids)) { -// -// TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId()); -// executor.submit(providerTask); -// -// } -// keepTrying = 10; -// } -// catch(TwitterException twitterException) { -// keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); -// } -// catch(Exception e) { -// keepTrying += TwitterErrorHandler.handleTwitterError(client, e); -// } -// } - } - - public class InstagramTimelineProviderTask implements Runnable { - - // twitter4j implementation below - replace with jInstagram - - private final Logger LOGGER = LoggerFactory.getLogger(InstagramTimelineProvider.class); - - private InstagramTimelineProvider provider; - private Instagram client; - private Long id; - - public InstagramTimelineProviderTask(InstagramTimelineProvider provider, Instagram client, Long id) { - this.provider = provider; - this.client = client; - this.id = id; - } - - @Override - public void run() { - - // twitter4j implementation below - replace with jInstagram - -// Paging paging = new Paging(1, 200); -// List<Status> statuses = null; -// boolean KeepGoing = true; -// boolean hadFailure = false; -// -// do -// { -// int keepTrying = 0; -// -// // keep trying to load, give it 5 attempts. -// //This value was chosen because it seemed like a reasonable number of times -// //to retry capturing a timeline given the sorts of errors that could potentially -// //occur (network timeout/interruption, faulty client, etc.) -// while (keepTrying < 5) -// { -// -// try -// { -// statuses = client.getUserTimeline(id, paging); -// -// for (Status tStat : statuses) -// { -// String json = TwitterObjectFactory.getRawJSON(tStat); -// -// try { -// provider.lock.readLock().lock(); -// ComponentUtils.offerUntilSuccess(new StreamsDatum(json), provider.providerQueue); -// } finally { -// provider.lock.readLock().unlock(); -// } -// } -// -// paging.setPage(paging.getPage() + 1); -// -// keepTrying = 10; -// } -// catch(TwitterException twitterException) { -// keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); -// } -// catch(Exception e) { -// keepTrying += TwitterErrorHandler.handleTwitterError(client, e); -// } -// } -// } -// while (provider.shouldContinuePulling(statuses)); - - LOGGER.info(id + " Thread Finished"); - - } - - } - - private Map<Long, Long> userPullInfo; - - protected boolean shouldContinuePulling(List<MediaFeedData> statuses) { - return (statuses != null) && (statuses.size() > 0); - } - - private void sleep() - { - Thread.yield(); - try { - // wait one tenth of a millisecond - Thread.yield(); - Thread.sleep(1); - Thread.yield(); - } - catch(IllegalArgumentException e) { - // passing in static values, this will never happen - } - catch(InterruptedException e) { - // noOp, there must have been an issue sleeping - } - Thread.yield(); - } - - public StreamsResultSet readCurrent() { - - LOGGER.info("Providing {} docs", providerQueue.size()); - - StreamsResultSet result; - - try { - lock.writeLock().lock(); - result = new StreamsResultSet(providerQueue); - result.setCounter(new DatumStatusCounter()); - providerQueue = constructQueue(); - } finally { - lock.writeLock().unlock(); - } - - if( providerQueue.isEmpty() && executor.isTerminated()) { - LOGGER.info("Finished. Cleaning up..."); - - running.set(false); - - LOGGER.info("Exiting"); - } - - return result; - - } - - protected Queue<StreamsDatum> constructQueue() { - return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(MAX_NUMBER_WAITING)); - } - - public StreamsResultSet readNew(BigInteger sequence) { - LOGGER.debug("{} readNew", STREAMS_ID); - throw new NotImplementedException(); - } - - public StreamsResultSet readRange(DateTime start, DateTime end) { - LOGGER.debug("{} readRange", STREAMS_ID); - throw new NotImplementedException(); - } - - @Override - public boolean isRunning() { - return running.get(); - } - - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - - - @Override - public void prepare(Object o) { - - executor = getExecutor(); - running.set(true); - try { - lock.writeLock().lock(); - providerQueue = constructQueue(); - } finally { - lock.writeLock().unlock(); - } - - Preconditions.checkNotNull(providerQueue); - - Preconditions.checkNotNull(this.klass); - Preconditions.checkNotNull(config.getAccessToken()); - - //idsCount = config.getFollow().size(); - - client = getInstagramClient(); - } - - protected Instagram getInstagramClient() - { - // twitter4j -> jInstagram -// String baseUrl = "https://api.instagram.com:443/1.1/"; -// -// ConfigurationBuilder builder = new ConfigurationBuilder() -// .setOAuthConsumerKey(config.getOauth().getConsumerKey()) -// .setOAuthConsumerSecret(config.getOauth().getConsumerSecret()) -// .setOAuthAccessToken(config.getOauth().getAccessToken()) -// .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret()) -// .setIncludeEntitiesEnabled(includeEntitiesEnabled) -// .setJSONStoreEnabled(jsonStoreEnabled) -// .setAsyncNumThreads(3) -// .setRestBaseURL(baseUrl) -// .setIncludeMyRetweetEnabled(Boolean.TRUE) -// .setPrettyDebugEnabled(Boolean.TRUE); -// -// return new InstagramFactory(builder.build()).getInstance(); - return null; - } - - @Override - public void cleanUp() { - shutdownAndAwaitTermination(executor); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json index 18a59b9..f8f8117 100644 --- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json +++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json @@ -5,16 +5,17 @@ "javaType" : "org.apache.streams.instagram.InstagramConfiguration", "javaInterfaces": ["java.io.Serializable"], "properties": { - "version": { + "clientId": { "type": "string", - "description": "The version" + "description": "Your Instagram Client Id" }, - "endpoint": { + "clientSecret": { "type": "string", - "description": "The endpoint" + "description": "Your Instagram Client secret" }, - "accessToken": { - "type": "string" + "callbackUrl": { + "type": "string", + "description": "Your Instagream callback url" } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json index 4b75ee4..b6a8c6b 100644 --- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json +++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json @@ -6,7 +6,7 @@ "extends": {"$ref":"InstagramConfiguration.json"}, "javaInterfaces": ["java.io.Serializable"], "properties": { - "info": { + "userIds": { "type": "array", "description": "A list of user IDs, indicating the users whose posts should be delivered on the stream", "items": { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java new file mode 100644 index 0000000..39f607c --- /dev/null +++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java @@ -0,0 +1,161 @@ +package org.apache.streams.instagram.provider; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import com.google.common.collect.Sets; +import org.apache.streams.instagram.InstagramUserInformationConfiguration; +import org.jinstagram.Instagram; +import org.jinstagram.entity.common.Pagination; +import org.jinstagram.entity.users.feed.MediaFeed; +import org.jinstagram.entity.users.feed.MediaFeedData; +import org.jinstagram.exceptions.InstagramException; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link org.apache.streams.instagram.provider.InstagramRecentMediaCollector} + */ +public class InstagramRecentMediaCollectorTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollectorTest.class); + + private int expectedDataCount = 0; + private long randomSeed = System.currentTimeMillis(); + private Random rand = new Random(randomSeed); + private Map<Pagination, MediaFeed> pageMap = Maps.newHashMap(); + + @Test + public void testHandleInstagramException1() throws InstagramException { + InstagramException ie = mock(InstagramException.class); + when(ie.getRemainingLimitStatus()).thenReturn(1); + final String message = "Test Message"; + when(ie.getMessage()).thenReturn(message); + InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), new InstagramUserInformationConfiguration()); + try { + collector.handleInstagramException(ie, 1); + fail("Expected RuntimeException to be thrown"); + } catch (InstagramException rte) { +// assertTrue(rte.getMessage().contains("Mock for InstagramException")); + assertEquals(message, rte.getMessage()); + } + } + + @Test + public void testHandleInstagramException2() throws InstagramException{ + InstagramException ie = mock(InstagramException.class); + when(ie.getRemainingLimitStatus()).thenReturn(0); + InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), new InstagramUserInformationConfiguration()); + long startTime = System.currentTimeMillis(); + collector.handleInstagramException(ie, 1); + long endTime = System.currentTimeMillis(); + LOGGER.debug("Slept for {} ms", startTime - endTime); + assertTrue(endTime - startTime >= 4000); //allow for 1 sec of error + startTime = System.currentTimeMillis(); + collector.handleInstagramException(ie, 2); + endTime = System.currentTimeMillis(); + LOGGER.debug("Slept for {} ms", startTime - endTime); + assertTrue(endTime - startTime >= 24000); //allow for 1 sec of error + } + + @Test + public void testGetUserIds() { + InstagramUserInformationConfiguration config = new InstagramUserInformationConfiguration(); + List<String> userIds = Lists.newLinkedList(); + userIds.add("1"); + userIds.add("2"); + userIds.add("3"); + userIds.add("4"); + userIds.add("abcdefg"); + config.setUserIds(userIds); + InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), config); + + Set<Long> expected = Sets.newHashSet(); + expected.add(1L); + expected.add(2L); + expected.add(3L); + expected.add(4L); + + assertEquals(expected, collector.getUserIds()); + } + + @Test + public void testRun() { + Queue<MediaFeedData> data = Queues.newConcurrentLinkedQueue(); + InstagramUserInformationConfiguration config = new InstagramUserInformationConfiguration(); + List<String> userIds = Lists.newLinkedList(); + userIds.add("1"); + userIds.add("2"); + userIds.add("3"); + userIds.add("4"); + config.setUserIds(userIds); + InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(data, config); + collector.setInstagramClient(createMockInstagramClient()); + collector.run(); + LOGGER.debug("Random seed == {}", randomSeed); + assertEquals("Random Seed == " + randomSeed, this.expectedDataCount, data.size()); + } + + private Instagram createMockInstagramClient() { + final Instagram instagramClient = mock(Instagram.class); + try { + final InstagramException mockException = mock(InstagramException.class); + when(mockException.getRemainingLimitStatus()).thenReturn(-1); + when(mockException.getMessage()).thenReturn("MockInstagramException message"); + when(instagramClient.getRecentMediaFeed(any(Long.class))).thenAnswer(new Answer<MediaFeed>() { + @Override + public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable { + long param = (Long) invocationOnMock.getArguments()[0]; + if (param == 2L) { + throw mockException; + } else { + return createRandomMockMediaFeed(); + } + } + }); + when(instagramClient.getRecentMediaNextPage(any(Pagination.class))).thenAnswer(new Answer<MediaFeed>() { + @Override + public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable { + return createRandomMockMediaFeed(); + } + }); + } catch (InstagramException ie) { + fail("Failed to create mock instagram client."); + } + return instagramClient; + } + + private MediaFeed createRandomMockMediaFeed() throws InstagramException { + MediaFeed feed = mock(MediaFeed.class); + when(feed.getData()).thenReturn(createData(this.rand.nextInt(100))); + Pagination pagination = mock(Pagination.class); + if(this.rand.nextInt(2) == 0) { + when(pagination.hasNextPage()).thenReturn(true); + } else { + when(pagination.hasNextPage()).thenReturn(false); + } + when(feed.getPagination()).thenReturn(pagination); + return feed; + } + + private List<MediaFeedData> createData(int size) { + List<MediaFeedData> data = Lists.newLinkedList(); + for(int i=0; i < size; ++i) { + data.add(mock(MediaFeedData.class)); + } + this.expectedDataCount += size; + return data; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java new file mode 100644 index 0000000..7d2a47b --- /dev/null +++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java @@ -0,0 +1,145 @@ +package org.apache.streams.instagram.provider; + +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.instagram.InstagramUserInformationConfiguration; +import org.jinstagram.entity.users.feed.MediaFeedData; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Random; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +/** + * + */ +public class InstagramRecentMediaProviderTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaProviderTest.class); + + @Test + public void testStartStream() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final InstagramRecentMediaCollector collectorStub = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), new InstagramUserInformationConfiguration()) { + + private volatile boolean isFinished = false; + + @Override + public void run() { + this.isFinished = true; + latch.countDown(); + } + + @Override + public boolean isCompleted() { + return this.isFinished; + } + }; + + InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(null) { + @Override + protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() { + return collectorStub; + } + }; + + provider.startStream(); + + latch.await(); + assertTrue(collectorStub.isCompleted()); + StreamsResultSet result = provider.readCurrent(); + assertNotNull(result); + assertEquals(0, result.size()); + assertTrue(!provider.isRunning()); + try { + provider.cleanUp(); + } catch (Throwable throwable){ + throwable.printStackTrace(); + fail("Error durring clean up"); + } + } + + @Test + public void testReadCurrent() { + final long seed = System.nanoTime(); + final Random rand = new Random(seed); + final CyclicBarrier test = new CyclicBarrier(2); + final CyclicBarrier produce = new CyclicBarrier(2); + final AtomicInteger batchCount = new AtomicInteger(0); + InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(new InstagramUserInformationConfiguration()) { + @Override + protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() { + return new InstagramRecentMediaCollector(super.mediaFeedQueue, new InstagramUserInformationConfiguration()) { + + private volatile boolean isFinished = false; + + + + public int getBatchCount() { + return batchCount.get(); + } + + @Override + public boolean isCompleted() { + return isFinished; + } + + @Override + public void run() { + int randInt = rand.nextInt(5); + while(randInt != 0) { + int batchSize = rand.nextInt(200); + for(int i=0; i < batchSize; ++i) { + while(!super.dataQueue.add(mock(MediaFeedData.class))) { + Thread.yield(); + } + } + batchCount.set(batchSize); + try { + test.await(); + produce.await(); + } catch (InterruptedException ie ) { + Thread.currentThread().interrupt(); + } catch (BrokenBarrierException bbe) { + Thread.currentThread().interrupt(); + } + randInt = rand.nextInt(5); + } + batchCount.set(0); + isFinished = true; + try { + test.await(); + produce.await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } catch (BrokenBarrierException bbe) { + Thread.currentThread().interrupt(); + } + } + + }; + } + }; + provider.startStream(); + while(provider.isRunning()) { + try { + test.await(); + assertEquals("Seed == "+seed, batchCount.get(), provider.readCurrent().size()); + produce.await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } catch (BrokenBarrierException bbe) { + Thread.currentThread().interrupt(); + } + + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java deleted file mode 100644 index fcf5e81..0000000 --- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.test; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.lang.StringUtils; -import org.apache.streams.instagram.serializer.InstagramJsonActivitySerializer; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; - -import static org.hamcrest.CoreMatchers.*; -import static org.junit.Assert.assertThat; - -/** -* Created with IntelliJ IDEA. -* User: sblackmon -* Date: 8/20/13 -* Time: 5:57 PM -* To change this template use File | Settings | File Templates. -*/ -public class InstagramActivitySerDeTest { - - private final static Logger LOGGER = LoggerFactory.getLogger(InstagramActivitySerDeTest.class); - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - private InstagramJsonActivitySerializer instagramJsonActivitySerializer = new InstagramJsonActivitySerializer(); - - // remove @Ignore after implementation - @Ignore - @Test - public void Tests() - { - InputStream is = InstagramActivitySerDeTest.class.getResourceAsStream("/test.txt"); - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); - - try { - while (br.ready()) { - String line = br.readLine(); - if(!StringUtils.isEmpty(line)) - { - LOGGER.info("raw: {}", line); - - // convert to MediaFeedData? - Activity activity = instagramJsonActivitySerializer.deserialize(line); - - String activitystring = mapper.writeValueAsString(activity); - - LOGGER.info("activity: {}", activitystring); - - assertThat(activity, is(not(nullValue()))); - - assertThat(activity.getId(), is(not(nullValue()))); - assertThat(activity.getActor(), is(not(nullValue()))); - assertThat(activity.getActor().getId(), is(not(nullValue()))); - assertThat(activity.getVerb(), is(not(nullValue()))); - assertThat(activity.getProvider(), is(not(nullValue()))); - - } - } - } catch( Exception e ) { - System.out.println(e); - e.printStackTrace(); - Assert.fail(); - } - } -}
