Hi, May I consult one question about why there is no limit size of `producerPool` in KafkaWriter. It seems that the current logical will create a new producer during create KafkaWriter and start checkpoint(snapshotState been invoked) if the producer pool is empty, and then add the producer into pool after KafkaCommitter#commit(), but only close these producers during closing KakfWriter.
I'm using flink 1.16, I meet a problem that there are too many kafka connections and many kafka-producer-network-thread. And I test it again with a simple demo which consume data from datagen and write to kafka with EXACTALY_ONCE semantics with parallelism. By checking ThreadDump and the log of task manager, there are 200+ kafka-producer-network-thread and `create new transactional producer` logs, and only one `Closing the Kafak producer` log found. The demo as follow: CREATE TABLE source_table ( id INT, name STRING, age INT, event_time TIMESTAMP(3) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.id.min' = '1', 'fields.id.max' = '10000000', 'fields.name.length' = '10', 'fields.age.min' = '18', 'fields.age.max' = '60' ); CREATE TABLE kafka_sink ( id INT, name STRING, age INT, event_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'flink-demo', 'properties.bootstrap.servers' = '192.168.0.13:9092,192.168.0.14:9092', 'format' = 'json', 'sink.partitioner' = 'fixed', 'sink.delivery-guarantee' = 'exactly-once', 'sink.transactional-id-prefix' = 'kafka-demo', 'properties.transaction.timeout.ms' = '300000' ); set execution.checkpointing.interval=10s; INSERT INTO kafka_sink SELECT id, name, age, event_time FROM source_table; And i found the old version logic is that the KafkaCommitter will close the producer after commit, but right now we want to recycle the producer by add the producer back to pool, but without any limitation or other actions to close the idle producer, like the thread pool logic. Any specific consideration on this case? Best Regards