Hi,

May I consult one question about why there is no limit size of
`producerPool` in KafkaWriter. It seems that the current logical will
create a new producer during create KafkaWriter and start
checkpoint(snapshotState been invoked) if the producer pool is empty, and
then add the producer into pool after KafkaCommitter#commit(), but only
close these producers during closing KakfWriter.

I'm using flink 1.16, I meet a problem that there are too many kafka
connections and many
kafka-producer-network-thread.
And I test it again with a simple demo which consume data from datagen and
write to kafka with EXACTALY_ONCE semantics with parallelism. By checking
ThreadDump and the log of task manager, there are 200+
kafka-producer-network-thread and `create new transactional producer` logs,
and only one `Closing the Kafak producer` log found. The demo as follow:
CREATE TABLE source_table (
id INT,
name STRING,
age INT,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.id.min' = '1',
'fields.id.max' = '10000000',
'fields.name.length' = '10',
'fields.age.min' = '18',
'fields.age.max' = '60'
);

CREATE TABLE kafka_sink (
id INT,
name STRING,
age INT,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'flink-demo',
'properties.bootstrap.servers' = '192.168.0.13:9092,192.168.0.14:9092',
'format' = 'json',
'sink.partitioner' = 'fixed',
'sink.delivery-guarantee' = 'exactly-once',
'sink.transactional-id-prefix' = 'kafka-demo',
'properties.transaction.timeout.ms' = '300000'
);

set execution.checkpointing.interval=10s;
INSERT INTO kafka_sink SELECT id, name, age, event_time FROM source_table;

And i found the old version logic is that the KafkaCommitter will close the
producer after commit, but right now we want to recycle the producer by add
the producer back to pool, but without any limitation or other actions to
close the idle producer, like the thread pool logic. Any specific
consideration on this case?

Best Regards

Reply via email to