[ 
https://issues.apache.org/jira/browse/KAFKA-5385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5385 started by Apurva Mehta.
-------------------------------------------
> Transactional Producer allows batches to expire and commits transactions 
> regardless
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-5385
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5385
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.11.0.0
>            Reporter: Apurva Mehta
>            Assignee: Apurva Mehta
>            Priority: Blocker
>              Labels: exactly-once
>             Fix For: 0.11.0.0
>
>
> The transactions system test has revealed a data loss issue. When there is 
> cluster instability, it can happen that the transactional requests 
> (AddPartitions, and AddOffsets) can retry for a long time. When they 
> eventually succeed, the commit message will be dequeued, at which point we 
> will try to drain the accumulator. However, we would find the batches should 
> be expired, and just drop them, but commit the transaction anyway. This 
> causes data loss. 
> Relevant portion from the producer log is here: 
> {noformat}
> [2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id] 
> Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION 
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> [2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id] 
> Enqueuing transactional request (type=EndTxnRequest, 
> transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, 
> result=COMMIT) 
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> [2017-06-06 01:07:36,276] TRACE Expired 3 batches in accumulator 
> (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2017-06-06 01:07:36,286] TRACE Produced messages to topic-partition 
> output-topic-0 with base offset offset -1 and error: {}. 
> (org.apache.kafka.clients.producer.internals.ProducerBatch)
> org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for 
> output-topic-0: 39080 ms has passed since batch creation plus linger time
> [2017-06-06 01:07:36,424] TRACE Produced messages to topic-partition 
> output-topic-1 with base offset offset -1 and error: {}. 
> (org.apache.kafka.clients.producer.internals.ProducerBatch)
> org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for 
> output-topic-1: 39080 ms has passed since batch creation plus linger time
> [2017-06-06 01:07:36,436] TRACE Produced messages to topic-partition 
> output-topic-2 with base offset offset -1 and error: {}. 
> (org.apache.kafka.clients.producer.internals.ProducerBatch)
> org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for 
> output-topic-2: 39080 ms has passed since batch creation plus linger time
> [2017-06-06 01:07:36,444] TRACE [TransactionalId my-first-transactional-id] 
> Request (type=EndTxnRequest, transactionalId=my-first-transactional-id, 
> producerId=1001, producerEpoch=0, result=COMMIT) dequeued for sending 
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> [2017-06-06 01:07:36,446] DEBUG [TransactionalId my-first-transactional-id] 
> Sending transactional request (type=EndTxnRequest, 
> transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, 
> result=COMMIT) to node knode04:9092 (id: 3 rack: null) 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2017-06-06 01:07:36,449] TRACE [TransactionalId my-first-transactional-id] 
> Received transactional response EndTxnResponse(error=NOT_COORDINATOR, 
> throttleTimeMs=0) for request (type=EndTxnRequest, 
> transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, 
> result=COMMIT) 
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> {noformat}
> As you can see, the commit goes ahead even though the batches are never sent. 
> In this test, we lost 750 messages in the output topic, and they correspond 
> exactly with the 750 messages in the input topic at the offset in this 
> portion of the log.
> The solution is to either never expire transactional batches, or fail the 
> transaction if any batches have expired. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to