I wrote a FlinkRoundRobinPartitioner extends FlinkKafkaPartitioner and use
it as following:

KafkaSink kafkaSink = KafkaSink.builder()
.setBootstrapServers(sinkServers).setKafkaProducerConfig(props)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(sinkTopic).setValueSerializationSchema(new SimpleStringSchema())
.setPartitioner(new FlinkRoundRobinPartitioner<>()).build()).build();

But when the partition number is changed(becomes larger), no data is
written to the new partitions.
I looked at the source code, it seems because the KafkaSinkContext can not
retrieve the partitions dynamically.
Is there any way to fix this?

Thanks,
Lei

Reply via email to