Github user rbnks commented on a diff in the pull request:
https://github.com/apache/incubator-streams/pull/66#discussion_r16203670
--- Diff:
streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
---
@@ -41,125 +46,126 @@ Licensed to the Apache Software Foundation (ASF)
under one
protected static final int MAX_ATTEMPTS = 5;
protected static final int SLEEP_SECS = 5; //5 seconds
- protected Queue dataQueue; //exposed for testing
- private InstagramUserInformationConfiguration config;
- private Instagram instagramClient;
+ protected Queue<MediaFeedData> dataQueue; //exposed for testing
+ private InstagramConfiguration config;
private AtomicBoolean isCompleted;
+ private SimpleTokenManager<InstagramOauthToken> tokenManger;
+ private int consecutiveErrorCount;
+ private BackOffStrategy backOffStrategy;
- public InstagramRecentMediaCollector(Queue<MediaFeedData> queue,
InstagramUserInformationConfiguration config) {
+ public InstagramRecentMediaCollector(Queue<MediaFeedData> queue,
InstagramConfiguration config) {
this.dataQueue = queue;
this.config = config;
- this.instagramClient = new Instagram(this.config.getClientId());
this.isCompleted = new AtomicBoolean(false);
+ this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
+ for (String clientId : this.config.getClientIds()) {
+ this.tokenManger.addTokenToPool(new
InstagramOauthToken(clientId));
+ }
+ this.consecutiveErrorCount = 0;
+ this.backOffStrategy = new ExponentialBackOffStrategy(2);
}
- /**
- * Set instagram client
- * @param instagramClient
- */
- protected void setInstagramClient(Instagram instagramClient) {
- this.instagramClient = instagramClient;
+
+ @VisibleForTesting
+ protected Instagram getNextInstagramClient() {
+ return new
Instagram(this.tokenManger.getNextAvailableToken().getClientId());
}
- /**
- * Gets the user ids from the {@link
org.apache.streams.instagram.InstagramUserInformationConfiguration} and
- * converts them to {@link java.lang.Long}
- * @return
- */
- protected Set<Long> getUserIds() {
- Set<Long> userIds = Sets.newHashSet();
- for(String id : config.getUserIds()) {
- try {
- userIds.add(Long.parseLong(id));
- } catch (NumberFormatException nfe) {
- LOGGER.error("Failed to parse user id, {}, to a long :
{}", id, nfe.getMessage());
+ private void queueData(MediaFeed userFeed, String userId) {
+ if (userFeed == null) {
+ LOGGER.error("User id, {}, returned a NULL media feed from
instagram.", userId);
+ } else {
+ for (MediaFeedData data : userFeed.getData()) {
+ this.dataQueue.offer(data);
--- End diff --
Previously I had a created a util to handle poll and offer failures due to
a thread being locked out of the queue for concurrent utils. but i have
changed to use the component utils now that the locks have been removed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---