[ 
https://issues.apache.org/jira/browse/KAFKA-13694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17498234#comment-17498234
 ] 

Guozhang Wang commented on KAFKA-13694:
---------------------------------------

Hello [~RivenSun] thanks for the report. If I understand you correctly, we are 
trying to solve two issues here:

1) Let the broker to print the meaningful error message when validation fails.
2) Let the broker to send the error message all the way back to client, so that 
client can also print the meaningful message.

For 1) I think it's definitely a win, and for that purpose I think we can 
consider just replace the `processFailedRecord(topicPartition, 
rve.invalidException)` call with `processFailedRecord(topicPartition, rve)`, 
which then should include both the invalidException as well as the recordErrors 
in the error message.

For 2), what I saw is that today we indeed put the `recordErrors` as part of 
the `PartitionResponse` and hence encoded back to the client, hence would be 
triggered as part of the `producer.Callback`. So if user does implement a 
Callback it should be observed; or did I missed something?

> Some InvalidRecordException messages are thrown away
> ----------------------------------------------------
>
>                 Key: KAFKA-13694
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13694
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients, core
>    Affects Versions: 3.0.0
>            Reporter: RivenSun
>            Priority: Major
>
> 1.Example
> Topic level config:"cleanup.policy":"compact" 
> But when the producer sends the message, the ProducerRecord does not specify 
> the key.
>  
> producer.log
> {code:java}
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One 
> or more records have been rejected {code}
>  
>  
> server.log
> {code:java}
> [2022-02-25 02:14:54,411] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
> org.apache.kafka.common.InvalidRecordException: One or more records have been 
> rejected {code}
> Through the logs of the producer and server, we do not know the reason for 
> the failure of sending, only that the message was rejected by the server.
> You can compare the RecordTooLargeException testCase, we can clearly know the 
> reason for the failure from the producer, and the server will not print the 
> log (the reason will be explained later)
> producer_message_too_large.log :
> {code:java}
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The 
> request included a message larger than the max message size the server will 
> accept.
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The 
> request included a message larger than the max message size the server will 
> accept. {code}
> 2.RootCause
> ReplicaManager#appendToLocalLog(...) ->
> Partition#appendRecordsToLeader(...) ->
> UnifiedLog#appendAsLeader(...) -> UnifiedLog#append(...) ->
> LogValidator#validateMessagesAndAssignOffsets(...) 
> 1) Analyze the validateMessagesAndAssignOffsets method,
> In the LogValidator#validateRecord method, validateKey and validateTimestamp 
> are called, and the error information of all messages is obtained: 
> Seq[ApiRecordError];
> In the subsequent processRecordErrors(recordErrors) method, currently only 
> special processing is done for Errors.INVALID_TIMESTAMP, because the ERROR 
> returned by validateKey is still the ordinary Errors.INVALID_RECORD, so the 
> code will run to
> {code:java}
> else {
>   throw new RecordValidationException(new InvalidRecordException(
>     "One or more records have been rejected"), errors)
> }{code}
> In fact, the *errors* variable here contains the specific information of each 
> recordError, but we did not put the errors information into the message of 
> InvalidRecordException.
> 2).The exception thrown by processRecordErrors will be caught by 
> ReplicaManager#appendToLocalLog(...), we continue to analyze the 
> `catchException code` of appendToLocalLog.
> Here, we can know the RecordTooLargeException, why the server does not print 
> the log.
> Under case rve: RecordValidationException,
> The server prints the log: processFailedRecord method, 
> and sends a response to the client: LogAppendResult method
> In these two methods, we can find that we only use rve.invalidException,
> For rve.recordErrors, the server neither prints it nor returns it to the 
> client.
> 3.Solution
> Two solutions, I prefer the second
> 1)Similar to Errors.INVALID_TIMESTAMP, the validateKey method returns 
> Errors.INVALID_RECORD_WITHOUT_KEY,
> In the processRecordErrors method, also do special processing for 
> Errors.INVALID_RECORD_WITHOUT_KEY
> 2)Modify the logic of the processRecordErrors method, no longer distinguish 
> the types of Errors, and {*}Even if new INVALID_RECORD types will be added in 
> the future{*}, we uniformly return:
> {code:java}
> throw new RecordValidationException(new InvalidRecordException(
>   "One or more records have been rejected due to " + errors.toString()), 
> errors) {code}
> Also need to add toString() method for ProduceResponse.RecordError class
> {code:java}
> @Override
> public String toString() {
>     return "RecordError("
>             + "batchIndex=" + batchIndex
>             + ", message=" + ((message == null) ? "null" : "'" + message + 
> "'")
>             + ")";
> } {code}
> In the past, the toString method of ProduceResponse.PartitionResponse has 
> called the toString method of ProduceResponse.RecordError, *but before we 
> were missing the RecordError#toString method.*
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to