默认的 shard assigner public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER = (shard, subtasks) -> shard.hashCode();
如何shard 的数量 大于 并发度 很容易造成分布不均。 想着用这种方法,在主类使用 static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(); static AtomicInteger counter = new AtomicInteger(0); public static final KinesisShardAssigner origin_shard = (shard, subtasks) -> { String shardId = shard.getShard().getShardId(); Integer index = map.get(shardId); if (index != null){ return index; } else{ counter.getAndIncrement(); Integer new_index = counter.get(); map.put(shardId, new_index); return new_index; } }; flinkKinesisConsumer.setShardAssigner(origin_shard); 虽然试验了一把没有问题。 但是感觉 这段代码 最终会运行在 task slot 里面。 这种方法的有效性 是不是*依赖 kinesis list shards api 返回的顺序固定*呢? 有没有不依赖 api 返回的。又能均匀分布的方法? 虽然问题是 kinesis 。但是感觉对 数据源 和 slot 并发读取多源的其他场景或许有相似之处。 初来社区。欢迎给出建议。 谢谢。