Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2261#discussion_r134645082 --- Diff: storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java --- @@ -52,25 +78,92 @@ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<I } @Override + public void refreshLoad(LoadMapping loadMapping) { + updateRing(loadMapping); + } + + @Override public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) { - if ((lastUpdate + 1000) < System.currentTimeMillis()) { - int local_total = 0; - for (int i = 0; i < targets.length; i++) { - int val = (int)(101 - (load.get(targets[i]) * 100)); - loads[i] = val; - local_total += val; + int rightNow; + while (true) { + rightNow = current.incrementAndGet(); + if (rightNow < CAPACITY) { + return rets[choices[rightNow]]; + } else if (rightNow == CAPACITY) { + current.set(0); + return rets[choices[0]]; } - total = local_total; - lastUpdate = System.currentTimeMillis(); + //race condition with another thread, and we lost + // try again } - int selected = random.nextInt(total); - int sum = 0; - for (int i = 0; i < targets.length; i++) { - sum += loads[i]; - if (selected < sum) { - return rets[i]; + } + + private void updateRing(LoadMapping load) { + int localTotal = 0; + for (int i = 0 ; i < targets.length; i++) { + int val = (int)(101 - (load.get(targets[i]) * 100)); + loads[i] = val; + localTotal += val; + } + + int currentIdx = 0; + int unassignedIdx = 0; + for (int i = 0 ; i < loads.length ; i++) { + if (currentIdx == CAPACITY) { + break; } + + int loadForTask = loads[i]; + int amount = Math.round(loadForTask * 1.0f * CAPACITY / localTotal); + // assign at least one for task + if (amount == 0) { + unassigned[unassignedIdx++] = i; + } + for (int j = 0; j < amount; j++) { + if (currentIdx == CAPACITY) { + break; + } + + prepareChoices[currentIdx++] = i; + } + } + + if (currentIdx < CAPACITY) { + // if there're some rooms, give unassigned tasks a chance to be included + // this should be really small amount, so just add them sequentially + if (unassignedIdx > 0) { + for (int i = currentIdx ; i < CAPACITY ; i++) { + prepareChoices[i] = unassigned[(i - currentIdx) % unassignedIdx]; + } + } else { + // just pick random + for (int i = currentIdx ; i < CAPACITY ; i++) { + prepareChoices[i] = random.nextInt(loads.length); + } + } + } + + shuffleArray(prepareChoices); + + // swapping two arrays + int[] tempForSwap = choices; + choices = prepareChoices; + prepareChoices = tempForSwap; + + current.set(0); --- End diff -- Again logically this should be -1 because we **increment and get** and unlike in `chooseTasks()` we don't read the value in this method, but it doesn't hurt much anyway.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---