hi??I have a source Kafka and a sink Kafka??when the amount of data processing grows??I need to expand Kafka topic's partition number ,but I don't want to restart the job to take effect. for source Kafka, I use flink.partition-discovery.interval-millis and it could consume the new parititon after I expand the Kafka topic's partition number.
but sink kafka don't work like this?? The flink kafka producer get topic's paritition list and cache in topicPartitionsMap as showed In Class FlinkKafkaProducer<IN> : int[] partitions = (int[])this.topicPartitionsMap.get(targetTopic); if (null == partitions) { partitions = getPartitionsByTopic(targetTopic, transaction.producer); this.topicPartitionsMap.put(targetTopic, partitions); }When kafka topic needs to expand??the new parition can not be discovered??For example, when expanding from 1 partition to 2 partitions??partitions we got is never update until job restartAre there plans to improve this feature or Is there any other way to achieve the function?