Hi, Xuyang & Daniel.

I have checked this part of code. I think it is an expected behavior.
As marked in code comments, this loop makes sure that the transactions
before this checkpoint id are re-created.

The situation Daniel mentioned will happen only when all checkpoint between
1 and 20000 fails. If so, we should check why these checkpoints failed.
The transaction producer will be used when the DeliveryGuarantee is
EXACTLY_ONCE. If other DeliveryGuarantee is accepted, you could use other
DeliveryGuarantee to skip it.

I think it is better to check whether there are many checkpoints failed,
and check the flame graph to make sure this code caused the busyness.

Best,
Hang

Xuyang <xyzhong...@163.com> 于2024年3月11日周一 09:58写道:

> Hi, Danny.
> When the problem occurs, can you use flame graph to confirm whether the
> loop in this code is causing the busyness?
> Since I'm not particularly familiar with kafka connector, I can't give you
> an accurate reply. I think Hang Ruan is an expert in this field :).
>
> Hi, Ruan Hang. Can you take a look at this strange situation?
>
>
> --
>     Best!
>     Xuyang
>
>
> 在 2024-03-10 16:49:16,"Daniel Peled" <daniel.peled.w...@gmail.com> 写道:
>
> Hello,
>
> I am sorry I am addressing you personally.
> I have tried sending the request in the user group and got no response
>
> If you can't help me please let me know
> And please tell me who can help up
>
> The problem is as followed:
>
> We have noticed that when we add a *new kafka sink* operator to the
> graph, *and start from the last save point*, the operator is 100% busy
> for several minutes and *even 1/2-1 hour* !!!
>
> The problematic code seems to be the following for-loop in
> getTransactionalProducer() method:
>
>
> *org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer*
>
> private FlinkKafkaInternalProducer<byte[], byte[]>
> getTransactionalProducer(long checkpointId) {
>         checkState(
>                 checkpointId > lastCheckpointId,
>                 "Expected %s > %s",
>                 checkpointId,
>                 lastCheckpointId);
>         FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
>         // in case checkpoints have been aborted, Flink would create
> non-consecutive transaction ids
>         // this loop ensures that all gaps are filled with initialized
> (empty) transactions
>
>
>
>
>
> * for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
>     String transactionalId =
> TransactionalIdFactory.buildTransactionalId(
> transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id);
>       producer = getOrCreateTransactionalProducer(transactionalId);
> }*
>         this.lastCheckpointId = checkpointId;
>         assert producer != null;
>         LOG.info("Created new transactional producer {}",
> producer.getTransactionalId());
>         return producer;
>     }
>
>
> Since we added a new sink operator the lastCheckpointId is 1,
> And if for example the checkpointId is 20000,
> The loop will be executed for 20000 times !!!
>
>
> We have several questions:
> 1. Is this behaviour expected ?
> 2. Are we doing something wrong ?
> 3. Is there a way to avoid this behavior ?
>
> Best Regards,
> Danny
>
>

Reply via email to