Matthias Schwalbe created FLINK-23509: -----------------------------------------
Summary: FlinkKafkaInternalProducer overrides static final ProducerIdAndEpoch#NONE during transaction recovery (fails) Key: FLINK-23509 URL: https://issues.apache.org/jira/browse/FLINK-23509 Project: Flink Issue Type: Bug Affects Versions: 1.13.0 Reporter: Matthias Schwalbe When recovering Kafka transactions from a snapshot, FlinkKafkaInternalProducer overrides static final ProducerIdAndEpoch#NONE here: [FlinkKafkaInternalProducer#resumeTransaction|https://github.com/apache/flink/blob/f06faf13930f2e8acccf1e04e2c250b85bdbf48e/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java#L229] and consequently TransactionManager initializes transactions as new transactions instead of recovered ones. Here: [TransactionManager#initializeTransactions|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L332] TransactionManager log (edited for readability): {{[Sink: trxRollupKafkaSink (1/1)#3|#3] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Overriding the default enable.idempotence to true since transactional.id is specified. [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Instantiated a transactional producer. [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled. [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Overriding the default acks to all since idempotence is enabled. [Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Transition from state UNINITIALIZED to INITIALIZING [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Invoking InitProducerId for the first time in order to acquire a producer ID [Sink: trxRollupKafkaSink (1/1)#3|#3] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional request InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17) [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] TRACE org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17) dequeued for sending [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', keyType=1) [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Enqueuing transactional request InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17) [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] TRACE org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', keyType=1) dequeued for sending [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] TRACE org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional response FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=3, host='ulxxtkafbrk03.adgr.net', port=9093) for request FindCoordinatorRequestData(key='Sink: trxRollupKafkaSink-...8b6-2', keyType=1) [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Discovered transaction coordinator ulxxtkafbrk03.adgr.net:9093 (id: 3 rack: null) [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] TRACE org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Request InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17) dequeued for sending [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] TRACE org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Received transactional response InitProducerIdResponseData(throttleTimeMs=0, errorCode=47, producerId=-1, producerEpoch=-1) for request InitProducerIdRequestData(transactionalId='Sink: trxRollupKafkaSink-...8b6-2', transactionTimeoutMs=60000, producerId=1545118, producerEpoch=17) [kafka-producer-network-thread | producer-Sink: trxRollupKafkaSink-...8b6-2] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Transition from state INITIALIZING to error state FATAL_ERROR [Sink: trxRollupKafkaSink (1/1)#3|#3] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-Sink: trxRollupKafkaSink-...8b6-2, transactionalId=Sink: trxRollupKafkaSink-...8b6-2] Closing the Kafka producer with timeoutMillis = 0 ms. org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at java.lang.Thread.run(Thread.java:748) }} ... -- This message was sent by Atlassian Jira (v8.3.4#803005)