Hello Guys,
Can someone please assist us regarding the following issue ?
We have noticed that when we add a *new kafka sink* operator to the graph, *and
start from the last save point*, the operator is 100% busy for several
minutes and *even 1/2-1 hour* !!!
The problematic code seems to be the following for-loop in
getTransactionalProducer() method:
*org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer*
private FlinkKafkaInternalProducer<byte[], byte[]>
getTransactionalProducer(long checkpointId) {
checkState(
checkpointId > lastCheckpointId,
"Expected %s > %s",
checkpointId,
lastCheckpointId);
FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
// in case checkpoints have been aborted, Flink would create
non-consecutive transaction ids
// this loop ensures that all gaps are filled with initialized
(empty) transactions
* for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
String transactionalId =
TransactionalIdFactory.buildTransactionalId(
transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id);
producer = getOrCreateTransactionalProducer(transactionalId);
}*
this.lastCheckpointId = checkpointId;
assert producer != null;
LOG.info("Created new transactional producer {}",
producer.getTransactionalId());
return producer;
}
Since we added a new sink operator the lastCheckpointId is 1,
And if for example the checkpointId is 20000,
The loop will be executed for 20000 times !!!
We have several questions:
1. Is this behaviour expected ?
2. Are we doing something wrong ?
3. Is there a way to avoid this behavior ?
Best Regards,
Danny