[
https://issues.apache.org/jira/browse/KAFKA-5385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Work on KAFKA-5385 started by Apurva Mehta.
-------------------------------------------
> Transactional Producer allows batches to expire and commits transactions
> regardless
> -----------------------------------------------------------------------------------
>
> Key: KAFKA-5385
> URL: https://issues.apache.org/jira/browse/KAFKA-5385
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.11.0.0
> Reporter: Apurva Mehta
> Assignee: Apurva Mehta
> Priority: Blocker
> Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> The transactions system test has revealed a data loss issue. When there is
> cluster instability, it can happen that the transactional requests
> (AddPartitions, and AddOffsets) can retry for a long time. When they
> eventually succeed, the commit message will be dequeued, at which point we
> will try to drain the accumulator. However, we would find the batches should
> be expired, and just drop them, but commit the transaction anyway. This
> causes data loss.
> Relevant portion from the producer log is here:
> {noformat}
> [2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id]
> Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> [2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id]
> Enqueuing transactional request (type=EndTxnRequest,
> transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0,
> result=COMMIT)
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> [2017-06-06 01:07:36,276] TRACE Expired 3 batches in accumulator
> (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2017-06-06 01:07:36,286] TRACE Produced messages to topic-partition
> output-topic-0 with base offset offset -1 and error: {}.
> (org.apache.kafka.clients.producer.internals.ProducerBatch)
> org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for
> output-topic-0: 39080 ms has passed since batch creation plus linger time
> [2017-06-06 01:07:36,424] TRACE Produced messages to topic-partition
> output-topic-1 with base offset offset -1 and error: {}.
> (org.apache.kafka.clients.producer.internals.ProducerBatch)
> org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for
> output-topic-1: 39080 ms has passed since batch creation plus linger time
> [2017-06-06 01:07:36,436] TRACE Produced messages to topic-partition
> output-topic-2 with base offset offset -1 and error: {}.
> (org.apache.kafka.clients.producer.internals.ProducerBatch)
> org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for
> output-topic-2: 39080 ms has passed since batch creation plus linger time
> [2017-06-06 01:07:36,444] TRACE [TransactionalId my-first-transactional-id]
> Request (type=EndTxnRequest, transactionalId=my-first-transactional-id,
> producerId=1001, producerEpoch=0, result=COMMIT) dequeued for sending
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> [2017-06-06 01:07:36,446] DEBUG [TransactionalId my-first-transactional-id]
> Sending transactional request (type=EndTxnRequest,
> transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0,
> result=COMMIT) to node knode04:9092 (id: 3 rack: null)
> (org.apache.kafka.clients.producer.internals.Sender)
> [2017-06-06 01:07:36,449] TRACE [TransactionalId my-first-transactional-id]
> Received transactional response EndTxnResponse(error=NOT_COORDINATOR,
> throttleTimeMs=0) for request (type=EndTxnRequest,
> transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0,
> result=COMMIT)
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> {noformat}
> As you can see, the commit goes ahead even though the batches are never sent.
> In this test, we lost 750 messages in the output topic, and they correspond
> exactly with the 750 messages in the input topic at the offset in this
> portion of the log.
> The solution is to either never expire transactional batches, or fail the
> transaction if any batches have expired.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)