Jun Qin created FLINK-16419:
-------------------------------
Summary: Avoid to recommit transactions which are known committed
successfully to Kafka upon recovery
Key: FLINK-16419
URL: https://issues.apache.org/jira/browse/FLINK-16419
Project: Flink
Issue Type: Improvement
Reporter: Jun Qin
When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer
tries to recommit all pre-committed transactions which are in the snapshot,
even if those transactions were successfully committed before (i.e., the call
to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}}
returns OK). This may lead to recovery failures when recovering from a very old
snapshot because the transactional IDs in that snapshot have been expired and
removed from Kafka. For example the following scenario:
# Start a Flink job with FlinkKafkaProducer sink with exactly-once
# Suspend the Flink job with a savepoint A
# Wait for time longer than {{transactional.id.expiration.ms}} +
{{transaction.remove.expired.transaction.cleanup.interval.ms}}
# Recover the job with savepoint A.
# The recovery will fail with the following error:
{noformat}
2020-02-26 14:33:25,817 INFO
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
- Attempting to resume transaction Source: Custom Source -> Sink:
Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch
1202020-02-26 14:33:25,914 INFO org.apache.kafka.clients.Metadata
- Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
2020-02-26 14:33:26,017 INFO org.apache.kafka.clients.producer.KafkaProducer
- [Producer clientId=producer-1, transactionalId=Source: Custom
Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka
producer with timeoutMillis = 92233720
36854775807 ms.
2020-02-26 14:33:26,019 INFO org.apache.flink.runtime.taskmanager.Task
- Source: Custom Source -> Sink: Unnamed (1/1)
(a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The
producer attempted to use a producer id which is not currently assigned to its
transactional id.
at
org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
at
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
at
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
at java.lang.Thread.run(Thread.java:748)
{noformat}
After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible
way is to let JobManager, after successfully notifies all operators the
completion of a snapshot (via {{notifyCheckpoingComplete}}), record the
success, e.g., write the successful transactional IDs somewhere in the
snapshot. Then those transactions need not recommit upon recovery.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)