Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2261#discussion_r134644962
--- Diff:
storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java ---
@@ -20,30 +20,56 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.task.WorkerTopologyContext;
public class LoadAwareShuffleGrouping implements
LoadAwareCustomStreamGrouping, Serializable {
+ private static final int CAPACITY = 1000;
+
private Random random;
private List<Integer>[] rets;
private int[] targets;
private int[] loads;
- private int total;
- private long lastUpdate = 0;
+ private int[] unassigned;
+ private int[] choices;
+ private int[] prepareChoices;
+ private AtomicInteger current;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId
stream, List<Integer> targetTasks) {
random = new Random();
+
rets = (List<Integer>[])new List<?>[targetTasks.size()];
targets = new int[targetTasks.size()];
for (int i = 0; i < targets.length; i++) {
rets[i] = Arrays.asList(targetTasks.get(i));
targets[i] = targetTasks.get(i);
}
+
+ // can't leave choices to be empty, so initiate it similar as
ShuffleGrouping
+ choices = new int[CAPACITY];
+
+ for (int i = 0 ; i < CAPACITY ; i++) {
+ choices[i] = i % rets.length;
+ }
+
+ shuffleArray(choices);
+ current = new AtomicInteger(0);
--- End diff --
Logically this should be -1 because we **increment and get**, but it
doesn't hurt much.
(Same applies to ShuffleGrouping)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---