Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2261#discussion_r134645082
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java ---
    @@ -52,25 +78,92 @@ public void prepare(WorkerTopologyContext context, 
GlobalStreamId stream, List<I
         }
     
         @Override
    +    public void refreshLoad(LoadMapping loadMapping) {
    +        updateRing(loadMapping);
    +    }
    +
    +    @Override
         public List<Integer> chooseTasks(int taskId, List<Object> values, 
LoadMapping load) {
    -        if ((lastUpdate + 1000) < System.currentTimeMillis()) {
    -            int local_total = 0;
    -            for (int i = 0; i < targets.length; i++) {
    -                int val = (int)(101 - (load.get(targets[i]) * 100));
    -                loads[i] = val;
    -                local_total += val;
    +        int rightNow;
    +        while (true) {
    +            rightNow = current.incrementAndGet();
    +            if (rightNow < CAPACITY) {
    +                return rets[choices[rightNow]];
    +            } else if (rightNow == CAPACITY) {
    +                current.set(0);
    +                return rets[choices[0]];
                 }
    -            total = local_total;
    -            lastUpdate = System.currentTimeMillis();
    +            //race condition with another thread, and we lost
    +            // try again
             }
    -        int selected = random.nextInt(total);
    -        int sum = 0;
    -        for (int i = 0; i < targets.length; i++) {
    -            sum += loads[i];
    -            if (selected < sum) {
    -                return rets[i];
    +    }
    +
    +    private void updateRing(LoadMapping load) {
    +        int localTotal = 0;
    +        for (int i = 0 ; i < targets.length; i++) {
    +            int val = (int)(101 - (load.get(targets[i]) * 100));
    +            loads[i] = val;
    +            localTotal += val;
    +        }
    +
    +        int currentIdx = 0;
    +        int unassignedIdx = 0;
    +        for (int i = 0 ; i < loads.length ; i++) {
    +            if (currentIdx == CAPACITY) {
    +                break;
                 }
    +
    +            int loadForTask = loads[i];
    +            int amount = Math.round(loadForTask * 1.0f * CAPACITY / 
localTotal);
    +            // assign at least one for task
    +            if (amount == 0) {
    +                unassigned[unassignedIdx++] = i;
    +            }
    +            for (int j = 0; j < amount; j++) {
    +                if (currentIdx == CAPACITY) {
    +                    break;
    +                }
    +
    +                prepareChoices[currentIdx++] = i;
    +            }
    +        }
    +
    +        if (currentIdx < CAPACITY) {
    +            // if there're some rooms, give unassigned tasks a chance to 
be included
    +            // this should be really small amount, so just add them 
sequentially
    +            if (unassignedIdx > 0) {
    +                for (int i = currentIdx ; i < CAPACITY ; i++) {
    +                    prepareChoices[i] = unassigned[(i - currentIdx) % 
unassignedIdx];
    +                }
    +            } else {
    +                // just pick random
    +                for (int i = currentIdx ; i < CAPACITY ; i++) {
    +                    prepareChoices[i] = random.nextInt(loads.length);
    +                }
    +            }
    +        }
    +
    +        shuffleArray(prepareChoices);
    +
    +        // swapping two arrays
    +        int[] tempForSwap = choices;
    +        choices = prepareChoices;
    +        prepareChoices = tempForSwap;
    +
    +        current.set(0);
    --- End diff --
    
    Again logically this should be -1 because we **increment and get** and 
unlike in `chooseTasks()` we don't read the value in this method, but it 
doesn't hurt much anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to