Gary Y. created KAFKA-6119:
------------------------------
Summary: Silent Data Loss in Kafka011 Transactional Producer
Key: KAFKA-6119
URL: https://issues.apache.org/jira/browse/KAFKA-6119
Project: Kafka
Issue Type: Bug
Components: core, producer
Affects Versions: 0.11.0.0, 0.11.0.1
Environment: openjdk version "1.8.0_144"
OpenJDK Runtime Environment (Zulu 8.23.0.3-macosx) (build 1.8.0_144-b01)
OpenJDK 64-Bit Server VM (Zulu 8.23.0.3-macosx) (build 25.144-b01, mixed mode)
Reporter: Gary Y.
Priority: Blocker
Kafka can lose data published by a transactional {{KafkaProducer}} under some
circumstances, i.e., data that should be committed atomically may not be fully
visible from a consumer with {{read_committed}} isolation level.
*Steps to reproduce:*
# Set {{transaction.timeout.ms}} to a low value such as {{100}}
# Publish two messages in one transaction to different partitions of a topic
with a sufficiently long time in-between the messages (e.g., 70 s).
# Only the second message is visible with {{read_committed}} isolation level.
See
https://github.com/GJL/kafka011-transactional-producer-bug-demo/blob/master/src/main/java/com/garyyao/App.java
for a full example. Detailed instructions can be found in the {{README.md}}:
https://github.com/GJL/kafka011-transactional-producer-bug-demo
*Why is this possible?*
Because the transaction timeout is set to a low value, the transaction will be
rolled back quickly after sending the first message. Indeed, in the broker the
following logs could be found:
{code}
[2017-10-25 22:54:58,224] INFO [Transaction Coordinator 0]: Initialized
transactionalId test-producer-1508964897483 with producerId 5 and producer
epoch 0 on partition __transaction_state-10
(kafka.coordinator.transaction.TransactionCoordinator)
[2017-10-25 22:55:24,260] INFO [Transaction Coordinator 0]: Completed rollback
ongoing transaction of transactionalId: test-producer-1508964897483 due to
timeout (kafka.coordinator.transaction.TransactionCoordinator)
{code}
After rollback the second message is sent to a different partition than the
first message.
Upon, transaction commit,
{{org.apache.kafka.clients.producer.internals.TransactionManager}} may enqueue
the request {{addPartitionsToTransactionHandler}}:
{code}
private TransactionalRequestResult beginCompletingTransaction(TransactionResult
transactionResult) {
if (!newPartitionsInTransaction.isEmpty())
enqueueRequest(addPartitionsToTransactionHandler());
EndTxnRequest.Builder builder = new
EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch, transactionResult);
EndTxnHandler handler = new EndTxnHandler(builder);
enqueueRequest(handler);
return handler.result;
}
{code}
As can be seen, the condition is fulfilled if {{newPartitionsInTransaction}} is
non-empty. I suspect because the second message goes to a different partition,
this condition is satisfied.
In {{KafkaApis.scala}}, I can see that {{handleAddPartitionToTxnRequest}}
eventually may call {{prepareAddPartitions}}:
{code}
def prepareAddPartitions(addedTopicPartitions: immutable.Set[TopicPartition],
updateTimestamp: Long): TxnTransitMetadata = {
val newTxnStartTimestamp = state match {
case Empty | CompleteAbort | CompleteCommit => updateTimestamp
case _ => txnStartTimestamp
}
prepareTransitionTo(Ongoing, producerId, producerEpoch, txnTimeoutMs,
(topicPartitions ++ addedTopicPartitions).toSet,
newTxnStartTimestamp, updateTimestamp)
}
{code}
Note that the method's first argument {{newState}} of is always *Ongoing* here.
I suspect that this puts the transaction, which should be aborted, to _Ongoing_
again.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)