William Parker created KAFKA-8674: ------------------------------------- Summary: Records that exceed maximum size on the Java Producer don't cause transaction failure Key: KAFKA-8674 URL: https://issues.apache.org/jira/browse/KAFKA-8674 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 2.1.0 Reporter: William Parker
When using transactions, the [documentation for the Java producer's send method|https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-] states that {code:java} When used as part of a transaction, it is not necessary to define a callback or check the result of the future in order to detect errors from send. If any of the send calls failed with an irrecoverable error, the final commitTransaction() call will fail and throw the exception from the last failed send. When this happens, your application should call abortTransaction() to reset the state and continue to send data. {code} However, when the message size is too large, this is not the behavior we have observed; rather, the commitTransaction call succeeds and the message is not sent to the broker although other messages succeed. A KafkaProducer$FutureFailure is returned from the send method, and when the .get method on this future is called a RecordTooLargeException is thrown, but the according to the documentation this should not be needed. I believe this occurs because the doSend method has an [ensureValidRecordSize call|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L893] which throws an exception early in its body, before later calls involving the TransactionManager occur; in essence the fact that this send was attempted is hidden from the TransactionManager. The logic that actually sends the record(s) appears to be [here|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L899], pasted below, in the following lines: {code:java} if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } {code} Specifically, as I understand this process, the Sender [retrieves data from the RecordAccumulator|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L342]. The [TransactionManager|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java] maintains state that indicates whether the transaction has failed, and when the commitTransaction call is made this state is checked. This state appears to be updated by the Sender, for example in [failBatch|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L686]. However, the crucial point is that when ensureValidRecordSize throws an exception, this block of interacting code that causes the transaction to fail (between the RecordAccumulator, the TransactionManager, and the Sender) is never reached. It appears to me that Kafka is not behaving as specified by its docs. Are there thoughts on whether the docs should be changed to reflect this behavior, or if the ensureValidRecordSize call should be changed to cause the TransactionManager to fail? Alternatively am I perhaps missing something? Is this something you'd be interested in taking a patch for? I found this bug on a cluster running Kafka 2.1.0, but the underlying bug seems to exist in the doSend method in [version 2.3.0|https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L908] and in the [trunk branch at time of writing|https://github.com/apache/kafka/blob/d227f940489434c2f23491340d4399d98fd48d2d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L908]. -- This message was sent by Atlassian JIRA (v7.6.14#76016)