默认的 shard assigner

public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER =
(shard, subtasks) -> shard.hashCode();

如何shard 的数量 大于 并发度 很容易造成分布不均。

想着用这种方法,在主类使用

static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
static AtomicInteger counter = new AtomicInteger(0);

public static final KinesisShardAssigner origin_shard  =
      (shard, subtasks) -> {
         String shardId = shard.getShard().getShardId();
         Integer index  = map.get(shardId);
         if (index != null){
            return index;
         }
         else{
            counter.getAndIncrement();
            Integer new_index = counter.get();
            map.put(shardId, new_index);
            return new_index;
         }
      };


flinkKinesisConsumer.setShardAssigner(origin_shard);

虽然试验了一把没有问题。 但是感觉 这段代码 最终会运行在 task slot 里面。

这种方法的有效性 是不是*依赖 kinesis list shards api 返回的顺序固定*呢?

有没有不依赖 api 返回的。又能均匀分布的方法?


虽然问题是 kinesis 。但是感觉对 数据源 和 slot 并发读取多源的其他场景或许有相似之处。


初来社区。欢迎给出建议。


谢谢。

回复