Integer[] rebalanceKeys = createRebalanceKeys(parallelism);
int rebalanceKeyIndex = new Random().nextInt(parallelism);
Integer key = rebalanceKeys[rebalanceKeyIndex];

 /**
     * 构建均衡 KEY 数组
     *
     * @param parallelism 并行度
     * @return
     */
    public static Integer[] createRebalanceKeys(int parallelism) {
        HashMap<Integer, LinkedHashSet<Integer>> groupRanges = new HashMap<>();
        int maxParallelism = 
KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism);
        // 构造多个 key 用于生成足够的 groupRanges
        int maxRandomKey = parallelism * 10;
        for (int randomKey = 0; randomKey < maxRandomKey; randomKey++) {
            int subtaskIndex = 
KeyGroupRangeAssignment.assignKeyToParallelOperator(randomKey, maxParallelism, 
parallelism);
            LinkedHashSet<Integer> randomKeys = 
groupRanges.computeIfAbsent(subtaskIndex, k -> new LinkedHashSet<>());
            randomKeys.add(randomKey);
        }

        Integer[] result = new Integer[parallelism];
        for (int i = 0; i < parallelism; i++) {
            LinkedHashSet<Integer> ranges = groupRanges.get(i);
            if (ranges == null || ranges.isEmpty()) {
                throw new RuntimeException("create rebalance keys error");
            }
            result[i] = ranges.stream().findFirst().get();
        }
        return result;
    }

 
发件人: junjie.m...@goupwith.com
发送时间: 2022-09-09 17:52
收件人: user-zh
主题: Re: Re: Key group is not in KeyGroupRange
key selector中使用random.nextInt(parallelism) 有时会报错
 
From: yue ma
Date: 2022-09-09 17:41
To: user-zh
Subject: Re: Key group is not in KeyGroupRange
你好,可以看一下使用的 key selector 是否稳定,key 是否会变化。
junjie.m...@goupwith.com <junjie.m...@goupwith.com> 于2022年9月9日周五 17:35写道:
> hi:
> 本人遇到了这个报错:
> Key group 51 is not in KeyGroupRange{startKeyGroup=64, endKeyGroup=127}.
> Unless you're directly using low level state access APIs, this is most
> likely caused by non-deterministic shuffle key (hashCode and equals
> implementation).
>
> 这个报错出现的莫名其妙,想了解这个错误是在什么样的情况下才会导致这个报错的?
> 谢谢!!
>
>

回复