Integer[] rebalanceKeys = createRebalanceKeys(parallelism); int rebalanceKeyIndex = new Random().nextInt(parallelism); Integer key = rebalanceKeys[rebalanceKeyIndex];
/** * 构建均衡 KEY 数组 * * @param parallelism 并行度 * @return */ public static Integer[] createRebalanceKeys(int parallelism) { HashMap<Integer, LinkedHashSet<Integer>> groupRanges = new HashMap<>(); int maxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism); // 构造多个 key 用于生成足够的 groupRanges int maxRandomKey = parallelism * 10; for (int randomKey = 0; randomKey < maxRandomKey; randomKey++) { int subtaskIndex = KeyGroupRangeAssignment.assignKeyToParallelOperator(randomKey, maxParallelism, parallelism); LinkedHashSet<Integer> randomKeys = groupRanges.computeIfAbsent(subtaskIndex, k -> new LinkedHashSet<>()); randomKeys.add(randomKey); } Integer[] result = new Integer[parallelism]; for (int i = 0; i < parallelism; i++) { LinkedHashSet<Integer> ranges = groupRanges.get(i); if (ranges == null || ranges.isEmpty()) { throw new RuntimeException("create rebalance keys error"); } result[i] = ranges.stream().findFirst().get(); } return result; } 发件人: junjie.m...@goupwith.com 发送时间: 2022-09-09 17:52 收件人: user-zh 主题: Re: Re: Key group is not in KeyGroupRange key selector中使用random.nextInt(parallelism) 有时会报错 From: yue ma Date: 2022-09-09 17:41 To: user-zh Subject: Re: Key group is not in KeyGroupRange 你好,可以看一下使用的 key selector 是否稳定,key 是否会变化。 junjie.m...@goupwith.com <junjie.m...@goupwith.com> 于2022年9月9日周五 17:35写道: > hi: > 本人遇到了这个报错: > Key group 51 is not in KeyGroupRange{startKeyGroup=64, endKeyGroup=127}. > Unless you're directly using low level state access APIs, this is most > likely caused by non-deterministic shuffle key (hashCode and equals > implementation). > > 这个报错出现的莫名其妙,想了解这个错误是在什么样的情况下才会导致这个报错的? > 谢谢!! > >