Hi, Danny.
When the problem occurs, can you use flame graph to confirm whether the loop in
this code is causing the busyness?
Since I'm not particularly familiar with kafka connector, I can't give you an
accurate reply. I think Hang Ruan is an expert in this field :).
Hi, Ruan Hang. Can you take a look at this strange situation?
--
Best!
Xuyang
在 2024-03-10 16:49:16,"Daniel Peled" <[email protected]> 写道:
Hello,
I am sorry I am addressing you personally.
I have tried sending the request in the user group and got no response
If you can't help me please let me know
And please tell me who can help up
The problem is as followed:
We have noticed that when we add a new kafka sink operator to the graph, and
start from the last save point, the operator is 100% busy for several minutes
and even 1/2-1 hour !!!
The problematic code seems to be the following for-loop in
getTransactionalProducer() method:
org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer
private FlinkKafkaInternalProducer<byte[], byte[]>
getTransactionalProducer(long checkpointId) {
checkState(
checkpointId > lastCheckpointId,
"Expected %s > %s",
checkpointId,
lastCheckpointId);
FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
// in case checkpoints have been aborted, Flink would create
non-consecutive transaction ids
// this loop ensures that all gaps are filled with initialized (empty)
transactions
for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
String transactionalId =
TransactionalIdFactory.buildTransactionalId(
transactionalIdPrefix,
kafkaSinkContext.getParallelInstanceId(), id);
producer = getOrCreateTransactionalProducer(transactionalId);
}
this.lastCheckpointId = checkpointId;
assert producer != null;
LOG.info("Created new transactional producer {}",
producer.getTransactionalId());
return producer;
}
Since we added a new sink operator the lastCheckpointId is 1,
And if for example the checkpointId is 20000,
The loop will be executed for 20000 times !!!
We have several questions:
1. Is this behaviour expected ?
2. Are we doing something wrong ?
3. Is there a way to avoid this behavior ?
Best Regards,
Danny