Github user rbnks commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/66#discussion_r16203579
  
    --- Diff: 
streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
 ---
    @@ -41,125 +46,126 @@ Licensed to the Apache Software Foundation (ASF) 
under one
         protected static final int MAX_ATTEMPTS = 5;
         protected static final int SLEEP_SECS = 5; //5 seconds
     
    -    protected Queue dataQueue; //exposed for testing
    -    private InstagramUserInformationConfiguration config;
    -    private Instagram instagramClient;
    +    protected Queue<MediaFeedData> dataQueue; //exposed for testing
    +    private InstagramConfiguration config;
         private AtomicBoolean isCompleted;
    +    private SimpleTokenManager<InstagramOauthToken> tokenManger;
    +    private int consecutiveErrorCount;
    +    private BackOffStrategy backOffStrategy;
     
     
    -    public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, 
InstagramUserInformationConfiguration config) {
    +    public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, 
InstagramConfiguration config) {
             this.dataQueue = queue;
             this.config = config;
    -        this.instagramClient = new Instagram(this.config.getClientId());
             this.isCompleted = new AtomicBoolean(false);
    +        this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
    +        for (String clientId : this.config.getClientIds()) {
    +            this.tokenManger.addTokenToPool(new 
InstagramOauthToken(clientId));
    +        }
    +        this.consecutiveErrorCount = 0;
    +        this.backOffStrategy = new ExponentialBackOffStrategy(2);
         }
     
    -    /**
    -     * Set instagram client
    -     * @param instagramClient
    -     */
    -    protected void setInstagramClient(Instagram instagramClient) {
    -        this.instagramClient = instagramClient;
    +
    +    @VisibleForTesting
    +    protected Instagram getNextInstagramClient() {
    +        return new 
Instagram(this.tokenManger.getNextAvailableToken().getClientId());
         }
     
    -    /**
    -     * Gets the user ids from the {@link 
org.apache.streams.instagram.InstagramUserInformationConfiguration} and
    -     * converts them to {@link java.lang.Long}
    -     * @return
    -     */
    -    protected Set<Long> getUserIds() {
    -        Set<Long> userIds = Sets.newHashSet();
    -        for(String id : config.getUserIds()) {
    -            try {
    -                userIds.add(Long.parseLong(id));
    -            } catch (NumberFormatException nfe) {
    -                LOGGER.error("Failed to parse user id, {}, to a long : 
{}", id, nfe.getMessage());
    +    private void queueData(MediaFeed userFeed, String userId) {
    +        if (userFeed == null) {
    +            LOGGER.error("User id, {}, returned a NULL media feed from 
instagram.", userId);
    +        } else {
    +            for (MediaFeedData data : userFeed.getData()) {
    +                this.dataQueue.offer(data);
                 }
             }
    -        return userIds;
         }
     
         /**
    -     * Determins the course of action to take when Instagram returns an 
exception to a request.  If it is a rate limit
    -     * exception, it implements an exponentional back off strategy.  If it 
is anyother exception, it is logged and
    -     * rethrown.
    -     * @param instaExec exception to handle
    -     * @param attempt number of attempts that have occured to pull this 
users information
    -     * @throws InstagramException
    +     * @return true when the collector has queued all of the available 
media feed data for the provided users.
          */
    -    protected void handleInstagramException(InstagramException instaExec, 
int attempt) throws InstagramException {
    -        LOGGER.debug("RemainingApiLimitStatus: {}", 
instaExec.getRemainingLimitStatus());
    -        if(instaExec.getRemainingLimitStatus() == 0) { //rate limit 
exception
    -            long sleepTime = Math.round(Math.pow(SLEEP_SECS, attempt)) * 
1000;
    -            try {
    -                LOGGER.debug("Encountered rate limit exception, sleeping 
for {} ms", sleepTime);
    -                Thread.sleep(sleepTime);
    -            } catch (InterruptedException ie) {
    -                Thread.currentThread().interrupt();
    +    public boolean isCompleted() {
    +        return this.isCompleted.get();
    +    }
    +
    +    @Override
    +    public void run() {
    +        try {
    +            for (UserId user : this.config.getUsersInfo().getUserIds()) {
    +                collectMediaFeed(user);
                 }
    -        } else {
    -            LOGGER.error("Instagram returned an excetpion to the user 
media request : {}", instaExec.getMessage());
    -            throw instaExec;
    +        } catch (Exception e) {
    +            LOGGER.error("Shutting down InstagramCollector. Exception 
occured: {}", e.getMessage());
             }
    +        this.isCompleted.set(true);
         }
     
         /**
    -     * Gets the MediaFeedData for this particular user and adds it to the 
share queued.
    -     * @param userId
    +     * Pull Recement Media for a user and queues the resulting data. Will 
try a single call 5 times before failing and
    +     * moving on to the next call or returning.
    +     * @param user
    +     * @throws Exception
          */
    -    private void getUserMedia(Long userId) {
    -        MediaFeed feed = null;
    -        int attempts = 0;
    -        int count = 0;
    +    @VisibleForTesting
    +    protected void collectMediaFeed(UserId user) throws Exception {
    +        Pagination pagination = null;
             do {
    -            ++attempts;
    -            try {
    -                feed = this.instagramClient.getRecentMediaFeed(userId);
    -                queueData(feed, userId);
    -                count += feed.getData().size();
    -                while(feed != null && feed.getPagination() != null && 
feed.getPagination().hasNextPage()) {
    -                    feed = 
this.instagramClient.getRecentMediaNextPage(feed.getPagination());
    -                    queueData(feed, userId);
    -                    count += feed.getData().size();
    -                }
    -            } catch (InstagramException ie) {
    +            int attempts = 0;
    +            boolean succesfullDataPull = false;
    +            while (!succesfullDataPull && attempts < MAX_ATTEMPTS) {
    +                ++attempts;
    +                MediaFeed feed = null;
                     try {
    -                    handleInstagramException(ie, attempts);
    -                } catch (InstagramException ie2) { //not a rate limit 
exception, ignore user
    -                    attempts = MAX_ATTEMPTS;
    -                }
    -            }
    -        } while(feed == null && attempts < MAX_ATTEMPTS);
    -        LOGGER.debug("For user, {}, received {} MediaFeedData", userId, 
count);
    -    }
    -
    -    private void queueData(MediaFeed userFeed, Long userId) {
    -        if(userFeed == null) {
    -            LOGGER.error("User id, {}, returned a NULL media feed from 
instagram.", userId);
    -        } else {
    -            for(MediaFeedData data : userFeed.getData()) {
    -                synchronized (this.dataQueue) { //unnecessary
    -                    while(!this.dataQueue.offer(data)) {
    -                        Thread.yield();
    +                    if (pagination == null) {
    +                        feed = 
getNextInstagramClient().getRecentMediaFeed(Long.valueOf(user.getUserId()),
    +                                0,
    +                                null,
    +                                null,
    +                                user.getBeforeDate() == null ? null : 
user.getBeforeDate().toDate(),
    +                                user.getAfterDate() == null ? null : 
user.getAfterDate().toDate());
    +                    } else {
    +                        feed = 
getNextInstagramClient().getRecentMediaNextPage(pagination);
    +                    }
    +                } catch (Exception e) {
    +                    handleException(e);
    +                    if(e instanceof InstagramBadRequestException) {
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to