I wrote a FlinkRoundRobinPartitioner extends FlinkKafkaPartitioner and use
it as following:

KafkaSink kafkaSink = KafkaSink.builder()
.setTopic(sinkTopic).setValueSerializationSchema(new SimpleStringSchema())
.setPartitioner(new FlinkRoundRobinPartitioner<>()).build()).build();

But when the partition number is changed(becomes larger), no data is
written to the new partitions.
I looked at the source code, it seems because the KafkaSinkContext can not
retrieve the partitions dynamically.
Is there any way to fix this?


Reply via email to