Hello,

We have noticed that when we add a *new kafka sink* operator to the graph, *and
start from the last save point*, the operator is 100% busy for several
minutes and *even 1/2-1 hour* !!!

The problematic code seems to be the following for-loop in
getTransactionalProducer() method:

*org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer*

private FlinkKafkaInternalProducer<byte[], byte[]>
getTransactionalProducer(long checkpointId) {
        checkState(
                checkpointId > lastCheckpointId,
                "Expected %s > %s",
                checkpointId,
                lastCheckpointId);
        FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
        // in case checkpoints have been aborted, Flink would create
non-consecutive transaction ids
        // this loop ensures that all gaps are filled with initialized
(empty) transactions





* for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
  String transactionalId =
TransactionalIdFactory.buildTransactionalId(
transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id);
      producer = getOrCreateTransactionalProducer(transactionalId);
}*
        this.lastCheckpointId = checkpointId;
        assert producer != null;
        LOG.info("Created new transactional producer {}",
producer.getTransactionalId());
        return producer;
    }


Since we added a new sink operator the lastCheckpointId is 1,
And if for example the checkpointId is 20000,
The loop will be executed for 20000 times !!!


We have several questions:
1. Is this behaviour expected ?
2. Are we doing something wrong ?
3. Is there a way to avoid this behavior ?

Best Regards,
Danny

Reply via email to