Seems i needn't to define a FlinkRoundRoubinPartitioner and just use the
RoundRobinPartitioner suppllied in kafka:
props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,
RoundRobinPartitioner.class.getName());

In this way the new partitions will be found dynamically. Howerever,
there's a bug in RoundRobinPartitioner and the distribution is still uneven
https://issues.apache.org/jira/browse/KAFKA-9965

On Tue, Jul 30, 2024 at 4:20 PM Lei Wang <leiwang...@gmail.com> wrote:

> I wrote a FlinkRoundRobinPartitioner extends FlinkKafkaPartitioner and use
> it as following:
>
> KafkaSink kafkaSink = KafkaSink.builder()
> .setBootstrapServers(sinkServers).setKafkaProducerConfig(props)
> .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
> .setTopic(sinkTopic).setValueSerializationSchema(new SimpleStringSchema())
> .setPartitioner(new FlinkRoundRobinPartitioner<>()).build()).build();
>
> But when the partition number is changed(becomes larger), no data is
> written to the new partitions.
> I looked at the source code, it seems because the KafkaSinkContext can not
> retrieve the partitions dynamically.
> Is there any way to fix this?
>
> Thanks,
> Lei
>

Reply via email to