Github user mfranklin commented on a diff in the pull request:
https://github.com/apache/incubator-streams/pull/66#discussion_r16149886
--- Diff:
streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
---
@@ -41,125 +46,126 @@ Licensed to the Apache Software Foundation (ASF)
under one
protected static final int MAX_ATTEMPTS = 5;
protected static final int SLEEP_SECS = 5; //5 seconds
- protected Queue dataQueue; //exposed for testing
- private InstagramUserInformationConfiguration config;
- private Instagram instagramClient;
+ protected Queue<MediaFeedData> dataQueue; //exposed for testing
+ private InstagramConfiguration config;
private AtomicBoolean isCompleted;
+ private SimpleTokenManager<InstagramOauthToken> tokenManger;
+ private int consecutiveErrorCount;
+ private BackOffStrategy backOffStrategy;
- public InstagramRecentMediaCollector(Queue<MediaFeedData> queue,
InstagramUserInformationConfiguration config) {
+ public InstagramRecentMediaCollector(Queue<MediaFeedData> queue,
InstagramConfiguration config) {
this.dataQueue = queue;
this.config = config;
- this.instagramClient = new Instagram(this.config.getClientId());
this.isCompleted = new AtomicBoolean(false);
+ this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
+ for (String clientId : this.config.getClientIds()) {
+ this.tokenManger.addTokenToPool(new
InstagramOauthToken(clientId));
+ }
+ this.consecutiveErrorCount = 0;
+ this.backOffStrategy = new ExponentialBackOffStrategy(2);
}
- /**
- * Set instagram client
- * @param instagramClient
- */
- protected void setInstagramClient(Instagram instagramClient) {
- this.instagramClient = instagramClient;
+
+ @VisibleForTesting
+ protected Instagram getNextInstagramClient() {
+ return new
Instagram(this.tokenManger.getNextAvailableToken().getClientId());
}
- /**
- * Gets the user ids from the {@link
org.apache.streams.instagram.InstagramUserInformationConfiguration} and
- * converts them to {@link java.lang.Long}
- * @return
- */
- protected Set<Long> getUserIds() {
- Set<Long> userIds = Sets.newHashSet();
- for(String id : config.getUserIds()) {
- try {
- userIds.add(Long.parseLong(id));
- } catch (NumberFormatException nfe) {
- LOGGER.error("Failed to parse user id, {}, to a long :
{}", id, nfe.getMessage());
+ private void queueData(MediaFeed userFeed, String userId) {
+ if (userFeed == null) {
+ LOGGER.error("User id, {}, returned a NULL media feed from
instagram.", userId);
+ } else {
+ for (MediaFeedData data : userFeed.getData()) {
+ this.dataQueue.offer(data);
}
}
- return userIds;
}
/**
- * Determins the course of action to take when Instagram returns an
exception to a request. If it is a rate limit
- * exception, it implements an exponentional back off strategy. If it
is anyother exception, it is logged and
- * rethrown.
- * @param instaExec exception to handle
- * @param attempt number of attempts that have occured to pull this
users information
- * @throws InstagramException
+ * @return true when the collector has queued all of the available
media feed data for the provided users.
*/
- protected void handleInstagramException(InstagramException instaExec,
int attempt) throws InstagramException {
- LOGGER.debug("RemainingApiLimitStatus: {}",
instaExec.getRemainingLimitStatus());
- if(instaExec.getRemainingLimitStatus() == 0) { //rate limit
exception
- long sleepTime = Math.round(Math.pow(SLEEP_SECS, attempt)) *
1000;
- try {
- LOGGER.debug("Encountered rate limit exception, sleeping
for {} ms", sleepTime);
- Thread.sleep(sleepTime);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
+ public boolean isCompleted() {
+ return this.isCompleted.get();
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (UserId user : this.config.getUsersInfo().getUserIds()) {
+ collectMediaFeed(user);
}
- } else {
- LOGGER.error("Instagram returned an excetpion to the user
media request : {}", instaExec.getMessage());
- throw instaExec;
+ } catch (Exception e) {
+ LOGGER.error("Shutting down InstagramCollector. Exception
occured: {}", e.getMessage());
}
+ this.isCompleted.set(true);
}
/**
- * Gets the MediaFeedData for this particular user and adds it to the
share queued.
- * @param userId
+ * Pull Recement Media for a user and queues the resulting data. Will
try a single call 5 times before failing and
+ * moving on to the next call or returning.
+ * @param user
+ * @throws Exception
*/
- private void getUserMedia(Long userId) {
- MediaFeed feed = null;
- int attempts = 0;
- int count = 0;
+ @VisibleForTesting
+ protected void collectMediaFeed(UserId user) throws Exception {
+ Pagination pagination = null;
do {
- ++attempts;
- try {
- feed = this.instagramClient.getRecentMediaFeed(userId);
- queueData(feed, userId);
- count += feed.getData().size();
- while(feed != null && feed.getPagination() != null &&
feed.getPagination().hasNextPage()) {
- feed =
this.instagramClient.getRecentMediaNextPage(feed.getPagination());
- queueData(feed, userId);
- count += feed.getData().size();
- }
- } catch (InstagramException ie) {
+ int attempts = 0;
+ boolean succesfullDataPull = false;
+ while (!succesfullDataPull && attempts < MAX_ATTEMPTS) {
+ ++attempts;
+ MediaFeed feed = null;
try {
- handleInstagramException(ie, attempts);
- } catch (InstagramException ie2) { //not a rate limit
exception, ignore user
- attempts = MAX_ATTEMPTS;
- }
- }
- } while(feed == null && attempts < MAX_ATTEMPTS);
- LOGGER.debug("For user, {}, received {} MediaFeedData", userId,
count);
- }
-
- private void queueData(MediaFeed userFeed, Long userId) {
- if(userFeed == null) {
- LOGGER.error("User id, {}, returned a NULL media feed from
instagram.", userId);
- } else {
- for(MediaFeedData data : userFeed.getData()) {
- synchronized (this.dataQueue) { //unnecessary
- while(!this.dataQueue.offer(data)) {
- Thread.yield();
+ if (pagination == null) {
+ feed =
getNextInstagramClient().getRecentMediaFeed(Long.valueOf(user.getUserId()),
+ 0,
+ null,
+ null,
+ user.getBeforeDate() == null ? null :
user.getBeforeDate().toDate(),
+ user.getAfterDate() == null ? null :
user.getAfterDate().toDate());
+ } else {
+ feed =
getNextInstagramClient().getRecentMediaNextPage(pagination);
+ }
+ } catch (Exception e) {
+ handleException(e);
+ if(e instanceof InstagramBadRequestException) {
--- End diff --
You are checking for InstagramBadRequestException here and in
handleException
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---