hi??I have a source Kafka and a sink Kafka??when the amount of data processing 
grows??I need to expand Kafka topic's  partition number ,but I don't want 
to restart the job to take effect.
for source Kafka, I use flink.partition-discovery.interval-millis and it could 
consume the new parititon after I expand the Kafka topic's partition 
number.


but sink kafka don't work like this??


The flink kafka producer get topic's paritition list and cache in 
topicPartitionsMap as showed In Class FlinkKafkaProducer<IN&gt;&nbsp;:


int[] partitions = (int[])this.topicPartitionsMap.get(targetTopic);
if (null == partitions) {
    partitions = getPartitionsByTopic(targetTopic, transaction.producer);
    this.topicPartitionsMap.put(targetTopic, partitions);
}When kafka topic needs to expand??the new parition can not be discovered??For 
example, when expanding from 1 partition to 2 partitions??partitions we got is 
never update until job restartAre there plans to improve this feature or Is 
there any other way to achieve the function?

回复