[ https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17929988#comment-17929988 ]
Hongshun Wang edited comment on FLINK-37356 at 2/25/25 3:58 AM: ---------------------------------------------------------------- [~arvid] , I have logs which prove it now: At first, ProducerFencedException is thrown, meaning that also have job with same transactional.id {code:java} 2025-02-25 09:44:55,172 [kafka-producer-network-thread | producer-kafka-sink-0-164] INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-kafka-sink-0-164, transactionalId=kafka-sink-0-164] Transiting to fatal error state due to org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.2025-02-25 09:44:55,173 [Source: DeliveryOrderNotifyJob -> Map -> Map -> Map -> Sink: Writer -> Sink: Committer (1/1)#7] ERROR org.apache.flink.connector.kafka.sink.KafkaCommitter [] - Unable to commit transaction (org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl@6b515922) because its producer is already fenced. This means that you either have a different producer with the same 'transactional.id' (this is unlikely with the 'KafkaSink' as all generated ids are unique and shouldn't be reused) or recovery took longer than 'transaction.timeout.ms' (900000ms). In both cases this most likely signals data loss, please consult the Flink documentation for more details.org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.{code} {color:#de350b}However, job don't stop now.{color} Then it seems that then init transaction{color:#de350b}( it's strange why still same transactionalId=kafka-sink-0-164){color} {code:java} 2025-02-25 09:47:55,302 [Source: DeliveryOrderNotifyJob -> Map -> Map -> Map -> Sink: Writer -> Sink: Committer (1/1)#7] INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-kafka-sink-0-164, transactionalId=kafka-sink-0-164] Invoking InitProducerId for the first time in order to acquire a producer ID2025-02-25 09:47:55,307 [kafka-producer-network-thread | producer-kafka-sink-0-164] INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-kafka-sink-0-164, transactionalId=kafka-sink-0-164] ProducerId set to 174137 with epoch 0 {code} Then producer attempted a transactional operation in an invalid state. {code:java} 2025-02-25 09:56:57,609 [jobmanager-io-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline checkpoint 353 by task e4888c74ef32c4d0c170727853063e00_cbc357ccb763df2852fee8c4fc7d55f2_0_7 of job 6df008868f2943ac8850bf926553a0a9 at job-6df00886-8f29-43ac-8850-bf926553a0a9-taskmanager-1-1 @ 21.224.27.157 (dataPort=38961).org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Task name with subtask : Source: DeliveryOrderNotifyJob -> Map -> Map -> Map -> Sink: Writer -> Sink: Committer (1/1)#7 Failure reason: Task has failed. 2025-02-25 09:50:56,583 [kafka-producer-network-thread | producer-kafka-sink-0-164] INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-kafka-sink-0-164, transactionalId=kafka-sink-0-164] Transiting to fatal error state due to org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.2025-02-25 09:50:56,583 [Source: DeliveryOrderNotifyJob -> Map -> Map -> Map -> Sink: Writer -> Sink: Committer (1/1)#7] ERROR org.apache.flink.connector.kafka.sink.KafkaCommitter [] - Unable to commit transaction (org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl@61f62203) because it's in an invalid state. Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.{code} then job fail with: {code:java} Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not perform checkpoint 353 for operator Source: DeliveryOrderNotifyJob -> Map -> Map -> Map -> Sink: Writer -> Sink: Committer (1/1)#7. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1434) ~[flink-dist-1.17-vvr-8.0.11-1-SNAPSHOT.jar:1.17-vvr-8.0.11-1-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$13(StreamTask.java:1381) ~[flink-dist-1.17-vvr-8.0.11-1-SNAPSHOT.jar:1.17-vvr-8.0.11-1-SNAPSHOT] ... 13 moreCaused by: org.apache.flink.util.SerializedThrowable: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.{code} then fail again: {code:java} org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors. at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:442) ~[?:?] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:949) ~[?:?] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:886) ~[?:?] at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:80) ~[?:?] at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) ~[?:?] {code} was (Author: JIRAUSER298968): [~arvid] , I have logs which prove it now: At first, ProducerFencedException is thrown, meaning that also have job with same transactional.id {code:java} 2025-02-25 09:44:55,172 [kafka-producer-network-thread | producer-kafka-sink-0-164] INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-kafka-sink-0-164, transactionalId=kafka-sink-0-164] Transiting to fatal error state due to org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.2025-02-25 09:44:55,173 [Source: DeliveryOrderNotifyJob -> Map -> Map -> Map -> Sink: Writer -> Sink: Committer (1/1)#7] ERROR org.apache.flink.connector.kafka.sink.KafkaCommitter [] - Unable to commit transaction (org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl@6b515922) because its producer is already fenced. This means that you either have a different producer with the same 'transactional.id' (this is unlikely with the 'KafkaSink' as all generated ids are unique and shouldn't be reused) or recovery took longer than 'transaction.timeout.ms' (900000ms). In both cases this most likely signals data loss, please consult the Flink documentation for more details.org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.{code} {color:#de350b}However, job don't stop now.{color} Then it seems that then init transaction{color:#de350b}( it's strange why still same transactionalId=kafka-sink-0-164){color} {code:java} 2025-02-25 09:47:55,302 [Source: DeliveryOrderNotifyJob -> Map -> Map -> Map -> Sink: Writer -> Sink: Committer (1/1)#7] INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-kafka-sink-0-164, transactionalId=kafka-sink-0-164] Invoking InitProducerId for the first time in order to acquire a producer ID2025-02-25 09:47:55,307 [kafka-producer-network-thread | producer-kafka-sink-0-164] INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-kafka-sink-0-164, transactionalId=kafka-sink-0-164] ProducerId set to 174137 with epoch 0 {code} Then producer attempted a transactional operation in an invalid state. {code:java} 2025-02-25 09:50:56,583 [kafka-producer-network-thread | producer-kafka-sink-0-164] INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-kafka-sink-0-164, transactionalId=kafka-sink-0-164] Transiting to fatal error state due to org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.2025-02-25 09:50:56,583 [Source: DeliveryOrderNotifyJob -> Map -> Map -> Map -> Sink: Writer -> Sink: Committer (1/1)#7] ERROR org.apache.flink.connector.kafka.sink.KafkaCommitter [] - Unable to commit transaction (org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl@61f62203) because it's in an invalid state. Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.{code} then job fail with: {code:java} Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not perform checkpoint 353 for operator Source: DeliveryOrderNotifyJob -> Map -> Map -> Map -> Sink: Writer -> Sink: Committer (1/1)#7. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1434) ~[flink-dist-1.17-vvr-8.0.11-1-SNAPSHOT.jar:1.17-vvr-8.0.11-1-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$13(StreamTask.java:1381) ~[flink-dist-1.17-vvr-8.0.11-1-SNAPSHOT.jar:1.17-vvr-8.0.11-1-SNAPSHOT] ... 13 moreCaused by: org.apache.flink.util.SerializedThrowable: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.{code} then fail again: {code:java} org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors. at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:442) ~[?:?] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:949) ~[?:?] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:886) ~[?:?] at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:80) ~[?:?] at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) ~[?:?] {code} > Recycle use of kafka producer(which commit error) maybe send data without > AddPartitionsToTxnRequest > --------------------------------------------------------------------------------------------------- > > Key: FLINK-37356 > URL: https://issues.apache.org/jira/browse/FLINK-37356 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Affects Versions: kafka-3.4.0 > Reporter: Hongshun Wang > Priority: Major > > In my production environment, READ_COMMITTED consumer can no longer consume > any records. Then I found that the LSO of the partition doesn't change for a > long time. I lookup all the log in Kafka cluster, then find that there is a > transaction lacking AddPartitionsToTxnRequest. > > At first, I think the problem is caused by > https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster > log contains InvalidTxnStateException. However, though the transaction is in > an invalid state, no data is written into Kafka topic partition in this > transaction( because in this case, the transaction is empty). It will not > influence any Kafka topic partition's LSO, thus consumer won't be blocked. > > Then I check the code of Kafka client, it seems no way to produce data > without AddPartitionsToTxnRequest done because the the `Sender` will refuse > to dequeue batches from the accumulator until they have been added to the > transaction. > {code:java} > // org.apache.kafka.clients.producer.KafkaProducer#doSend > private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback > callback) { > // ..ignore code > // Add the partition to the transaction (if in progress) after it has been > successfully > // appended to the accumulator. We cannot do it before because the partition > may be > // unknown or the initially selected partition may be changed when the batch > is closed > // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse > to dequeue > // batches from the accumulator until they have been added to the transaction. > if (transactionManager != null) { > transactionManager.maybeAddPartition(appendCallbacks.topicPartition()); > // ignore code > }{code} > {code:java} > //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode > > if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // > there is a rare case that a single batch size is larger than the request size > due to // compression; in this case we will still eventually send this batch > in a single request break; } else { if > (shouldStopDrainBatchesForPartition(first, tp)) break; } > {code} > > Then I have a idea that if a TransactionManager which doesn't clear > partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will > be sent again. It maybe happen in Flink Kafka connector: > > 1. The flink kafka connector also reuse and recycle KafkaProducer: > KafkaCommitter will recycle the producer to > producerPool after the transaction complete or exception, and then > KafkaWriter will reuse it from producerPool. > {code:java} > // code placeholder > org.apache.flink.connector.kafka.sink.KafkaCommitter#commit > @Override > public void commit(Collection<CommitRequest<KafkaCommittable>> requests) > throws IOException, InterruptedException { > for (CommitRequest<KafkaCommittable> request : requests) { > > producer.commitTransaction(); > producer.flush(); > recyclable.ifPresent(Recyclable::close); > } catch (RetriableException e) { > request.retryLater(); > } catch (ProducerFencedException e) { > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithKnownReason(e); > } catch (InvalidTxnStateException e) { > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithKnownReason(e); > } catch (UnknownProducerIdException e) { > LOG.error( > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithKnownReason(e); > } catch (Exception e) { > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithUnknownReason(e); > } > } > } {code} > 2. If KafkaCommitter meet an exception and doesn't sucess to > commitTransaction, the partitionsInTransaction in > TransactionManager won't be > clear(org.apache.kafka.clients.producer.internals.TransactionManager#completeTransaction). > > 3. If KafkaWriter which reuse same producer and send data to same > partitions in next transaction, the AddPartitionsToTxnRequest won't be send. > > Thus, in FlinkKafkaInternalProducer#setTransactionId, we should clear the > partition information of TransactionManager.(now we just set > transactionalId and currentState. > {code:java} > // code placeholder > public void setTransactionId(String transactionalId) { > if (!transactionalId.equals(this.transactionalId)) { > checkState( > !inTransaction, > String.format("Another transaction %s is still open.", > transactionalId)); > LOG.debug("Change transaction id from {} to {}", > this.transactionalId, transactionalId); > Object transactionManager = getTransactionManager(); > synchronized (transactionManager) { > setField(transactionManager, "transactionalId", transactionalId); > setField( > transactionManager, > "currentState", > getTransactionManagerState("UNINITIALIZED")); > this.transactionalId = transactionalId; > } > } > } {code} > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)