Hello, We have noticed that when we add a *new kafka sink* operator to the graph, *and start from the last save point*, the operator is 100% busy for several minutes and *even 1/2-1 hour* !!!
The problematic code seems to be the following for-loop in getTransactionalProducer() method: *org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer* private FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(long checkpointId) { checkState( checkpointId > lastCheckpointId, "Expected %s > %s", checkpointId, lastCheckpointId); FlinkKafkaInternalProducer<byte[], byte[]> producer = null; // in case checkpoints have been aborted, Flink would create non-consecutive transaction ids // this loop ensures that all gaps are filled with initialized (empty) transactions * for (long id = lastCheckpointId + 1; id <= checkpointId; id++) { String transactionalId = TransactionalIdFactory.buildTransactionalId( transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id); producer = getOrCreateTransactionalProducer(transactionalId); }* this.lastCheckpointId = checkpointId; assert producer != null; LOG.info("Created new transactional producer {}", producer.getTransactionalId()); return producer; } Since we added a new sink operator the lastCheckpointId is 1, And if for example the checkpointId is 20000, The loop will be executed for 20000 times !!! We have several questions: 1. Is this behaviour expected ? 2. Are we doing something wrong ? 3. Is there a way to avoid this behavior ? Best Regards, nick