Good morning, Any updates/progress on this issue ?
BR, Danny בתאריך יום א׳, 4 בפבר׳ 2024 ב-13:20 מאת Daniel Peled < daniel.peled.w...@gmail.com>: > Hello, > > We have noticed that when we add a *new kafka sink* operator to the > graph, *and start from the last save point*, the operator is 100% busy > for several minutes and *even 1/2-1 hour* !!! > > The problematic code seems to be the following for-loop in > getTransactionalProducer() method: > > > *org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer* > > private FlinkKafkaInternalProducer<byte[], byte[]> > getTransactionalProducer(long checkpointId) { > checkState( > checkpointId > lastCheckpointId, > "Expected %s > %s", > checkpointId, > lastCheckpointId); > FlinkKafkaInternalProducer<byte[], byte[]> producer = null; > // in case checkpoints have been aborted, Flink would create > non-consecutive transaction ids > // this loop ensures that all gaps are filled with initialized > (empty) transactions > > > > > > * for (long id = lastCheckpointId + 1; id <= checkpointId; id++) { > String transactionalId = > TransactionalIdFactory.buildTransactionalId( > transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id); > producer = getOrCreateTransactionalProducer(transactionalId); > }* > this.lastCheckpointId = checkpointId; > assert producer != null; > LOG.info("Created new transactional producer {}", > producer.getTransactionalId()); > return producer; > } > > > Since we added a new sink operator the lastCheckpointId is 1, > And if for example the checkpointId is 20000, > The loop will be executed for 20000 times !!! > > > We have several questions: > 1. Is this behaviour expected ? > 2. Are we doing something wrong ? > 3. Is there a way to avoid this behavior ? > > Best Regards, > Danny >