Good morning,

Any updates/progress on this issue ?

BR,
Danny

‫בתאריך יום א׳, 4 בפבר׳ 2024 ב-13:20 מאת ‪Daniel Peled‬‏ <‪
daniel.peled.w...@gmail.com‬‏>:‬

> Hello,
>
> We have noticed that when we add a *new kafka sink* operator to the
> graph, *and start from the last save point*, the operator is 100% busy
> for several minutes and *even 1/2-1 hour* !!!
>
> The problematic code seems to be the following for-loop in
> getTransactionalProducer() method:
>
>
> *org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer*
>
> private FlinkKafkaInternalProducer<byte[], byte[]>
> getTransactionalProducer(long checkpointId) {
>         checkState(
>                 checkpointId > lastCheckpointId,
>                 "Expected %s > %s",
>                 checkpointId,
>                 lastCheckpointId);
>         FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
>         // in case checkpoints have been aborted, Flink would create
> non-consecutive transaction ids
>         // this loop ensures that all gaps are filled with initialized
> (empty) transactions
>
>
>
>
>
> * for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
>     String transactionalId =
> TransactionalIdFactory.buildTransactionalId(
> transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id);
>       producer = getOrCreateTransactionalProducer(transactionalId);
> }*
>         this.lastCheckpointId = checkpointId;
>         assert producer != null;
>         LOG.info("Created new transactional producer {}",
> producer.getTransactionalId());
>         return producer;
>     }
>
>
> Since we added a new sink operator the lastCheckpointId is 1,
> And if for example the checkpointId is 20000,
> The loop will be executed for 20000 times !!!
>
>
> We have several questions:
> 1. Is this behaviour expected ?
> 2. Are we doing something wrong ?
> 3. Is there a way to avoid this behavior ?
>
> Best Regards,
> Danny
>

Reply via email to