William Parker created KAFKA-8674:
-------------------------------------

             Summary: Records that exceed maximum size on the Java Producer 
don't cause transaction failure
                 Key: KAFKA-8674
                 URL: https://issues.apache.org/jira/browse/KAFKA-8674
             Project: Kafka
          Issue Type: Bug
          Components: producer 
    Affects Versions: 2.1.0
            Reporter: William Parker


When using transactions, the [documentation for the Java producer's send 
method|https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]
 states that
{code:java}
When used as part of a transaction, it is not necessary to define a callback or 
check the result of the future in order to detect errors from send. If any of 
the send calls failed with an irrecoverable error, the final 
commitTransaction() call will fail and throw the exception from the last failed 
send. When this happens, your application should call abortTransaction() to 
reset the state and continue to send data.
{code}
However, when the message size is too large, this is not the behavior we have 
observed; rather, the commitTransaction call succeeds and the message is not 
sent to the broker although other messages succeed. A 
KafkaProducer$FutureFailure is returned from the send method, and when the .get 
method on this future is called a RecordTooLargeException is thrown, but the 
according to the documentation this should not be needed.

I believe this occurs because the doSend method has an [ensureValidRecordSize 
call|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L893]
 which throws an exception early in its body, before later calls involving the 
TransactionManager occur; in essence the fact that this send was attempted is 
hidden from the TransactionManager. The logic that actually sends the record(s) 
appears to be 
[here|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L899],
 pasted below, in the following lines:
{code:java}
if (transactionManager != null && transactionManager.isTransactional())
 transactionManager.maybeAddPartitionToTransaction(tp);

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, 
serializedKey,
 serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
 log.trace("Waking up the sender since topic {} partition {} is either full or 
getting a new batch", record.topic(), partition);
 this.sender.wakeup();
}
{code}
Specifically, as I understand this process, the Sender [retrieves data from the 
RecordAccumulator|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L342].
 The 
[TransactionManager|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java]
 maintains state that indicates whether the transaction has failed, and when 
the commitTransaction call is made this state is checked. This state appears to 
be updated by the Sender, for example in 
[failBatch|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L686].

However, the crucial point is that when ensureValidRecordSize throws an 
exception, this block of interacting code that causes the transaction to fail 
(between the RecordAccumulator, the TransactionManager, and the Sender) is 
never reached.

It appears to me that Kafka is not behaving as specified by its docs. Are there 
thoughts on whether the docs should be changed to reflect this behavior, or if 
the ensureValidRecordSize call should be changed to cause the 
TransactionManager to fail? Alternatively am I perhaps missing something? Is 
this something you'd be interested in taking a patch for?

I found this bug on a cluster running Kafka 2.1.0, but the underlying bug seems 
to exist in the doSend method in [version 
2.3.0|https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L908]
 and in the [trunk branch at time of 
writing|https://github.com/apache/kafka/blob/d227f940489434c2f23491340d4399d98fd48d2d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L908].
 

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to