[ 
https://issues.apache.org/jira/browse/KAFKA-13694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17498734#comment-17498734
 ] 

RivenSun edited comment on KAFKA-13694 at 2/28/22, 7:08 AM:
------------------------------------------------------------

Hi [~guozhang] 

If we simply change to call `processFailedRecord(topicPartition, rve)`, the 
actual effect will be worse.
1) Still won't print specific recordErrors.
2) Specific stack information will be printed, resulting in performance loss.
{code:java}
[2022-02-28 14:43:56,404] ERROR [ReplicaManager broker=0] Error processing 
append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
kafka.common.RecordValidationException: 
org.apache.kafka.common.InvalidRecordException: One or more records have been 
rejected
    at kafka.log.LogValidator$.processRecordErrors(LogValidator.scala:579)
    at 
kafka.log.LogValidator$.$anonfun$assignOffsetsNonCompressed$1(LogValidator.scala:313)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at 
kafka.log.LogValidator$.assignOffsetsNonCompressed(LogValidator.scala:290)
    at 
kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:108)
    at kafka.log.UnifiedLog.append(UnifiedLog.scala:807)
    at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:723)
    at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1059)
    at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1047)
    at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:930)
    at 
scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
    at 
scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
    at scala.collection.mutable.HashMap.map(HashMap.scala:35)
    at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:918)
    at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:589)
    at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:661)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:172)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.InvalidRecordException: One or more records 
have been rejected {code}
So I suggest to modify the *LogValidator#processRecordErrors* method to remove 
the judgment of Errors type and {*}Even if new INVALID_RECORD types will be 
added in the future{*}, we uniformly return. 
In fact, the server does not print specific recordErrors {*}for 
Errors.INVALID_TIMESTAMP{*}; 

1) processRecordErrors  method
{code:java}
private def processRecordErrors(recordErrors: Seq[ApiRecordError]): Unit = {
  if (recordErrors.nonEmpty) {
    val errors = recordErrors.map(_.recordError)
    throw new RecordValidationException(new InvalidRecordException(
      "One or more records have been rejected due to " + errors), errors)
  }
} {code}
2) As mentioned at the end of the JIRA description, we need to add toString() 
method for ProduceResponse.RecordError class.
Modified result:
{code:java}
[2022-02-28 14:53:23,418] ERROR [ReplicaManager broker=0] Error processing 
append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: One or more records have been 
rejected due to ArrayBuffer(RecordError(batchIndex=0, message='Compacted topic 
cannot accept message without key in topic partition rivenTest4-0.'))
[2022-02-28 14:53:38,316] ERROR [ReplicaManager broker=0] Error processing 
append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: One or more records have been 
rejected due to ArrayBuffer(RecordError(batchIndex=0, message='Compacted topic 
cannot accept message without key in topic partition rivenTest4-0.')) {code}
WDYT?
Thanks.


was (Author: rivensun):
Hi [~guozhang] 

If we simply change to call `processFailedRecord(topicPartition, rve)`, the 
actual effect will be worse.
1) Still won't print specific recordErrors.
2) Specific stack information will be printed, resulting in performance loss.
{code:java}
[2022-02-28 14:43:56,404] ERROR [ReplicaManager broker=0] Error processing 
append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
kafka.common.RecordValidationException: 
org.apache.kafka.common.InvalidRecordException: One or more records have been 
rejected
    at kafka.log.LogValidator$.processRecordErrors(LogValidator.scala:579)
    at 
kafka.log.LogValidator$.$anonfun$assignOffsetsNonCompressed$1(LogValidator.scala:313)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at 
kafka.log.LogValidator$.assignOffsetsNonCompressed(LogValidator.scala:290)
    at 
kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:108)
    at kafka.log.UnifiedLog.append(UnifiedLog.scala:807)
    at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:723)
    at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1059)
    at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1047)
    at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:930)
    at 
scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
    at 
scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
    at scala.collection.mutable.HashMap.map(HashMap.scala:35)
    at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:918)
    at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:589)
    at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:661)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:172)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.InvalidRecordException: One or more records 
have been rejected {code}

So I suggest to modify the *LogValidator#processRecordErrors* method to remove 
the judgment of Errors type. In fact, the server does not print specific 
recordErrors {*}for Errors.INVALID_TIMESTAMP{*}; and {*}Even if new 
INVALID_RECORD types will be added in the future{*}, we uniformly return

1) processRecordErrors  method
{code:java}
private def processRecordErrors(recordErrors: Seq[ApiRecordError]): Unit = {
  if (recordErrors.nonEmpty) {
    val errors = recordErrors.map(_.recordError)
    throw new RecordValidationException(new InvalidRecordException(
      "One or more records have been rejected due to " + errors), errors)
  }
} {code}
2) As mentioned at the end of the JIRA description, we need to add toString() 
method for ProduceResponse.RecordError class.
Modified result:
{code:java}
[2022-02-28 14:53:23,418] ERROR [ReplicaManager broker=0] Error processing 
append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: One or more records have been 
rejected due to ArrayBuffer(RecordError(batchIndex=0, message='Compacted topic 
cannot accept message without key in topic partition rivenTest4-0.'))
[2022-02-28 14:53:38,316] ERROR [ReplicaManager broker=0] Error processing 
append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: One or more records have been 
rejected due to ArrayBuffer(RecordError(batchIndex=0, message='Compacted topic 
cannot accept message without key in topic partition rivenTest4-0.')) {code}

WDYT?
Thanks.

> Some InvalidRecordException messages are thrown away
> ----------------------------------------------------
>
>                 Key: KAFKA-13694
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13694
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients, core
>    Affects Versions: 3.0.0
>            Reporter: RivenSun
>            Priority: Major
>
> 1.Example
> Topic level config:"cleanup.policy":"compact" 
> But when the producer sends the message, the ProducerRecord does not specify 
> the key.
>  
> producer.log
> {code:java}
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One 
> or more records have been rejected {code}
>  
>  
> server.log
> {code:java}
> [2022-02-25 02:14:54,411] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
> org.apache.kafka.common.InvalidRecordException: One or more records have been 
> rejected {code}
> Through the logs of the producer and server, we do not know the reason for 
> the failure of sending, only that the message was rejected by the server.
> You can compare the RecordTooLargeException testCase, we can clearly know the 
> reason for the failure from the producer, and the server will not print the 
> log (the reason will be explained later)
> producer_message_too_large.log :
> {code:java}
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The 
> request included a message larger than the max message size the server will 
> accept.
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The 
> request included a message larger than the max message size the server will 
> accept. {code}
> 2.RootCause
> ReplicaManager#appendToLocalLog(...) ->
> Partition#appendRecordsToLeader(...) ->
> UnifiedLog#appendAsLeader(...) -> UnifiedLog#append(...) ->
> LogValidator#validateMessagesAndAssignOffsets(...) 
> 1) Analyze the validateMessagesAndAssignOffsets method,
> In the LogValidator#validateRecord method, validateKey and validateTimestamp 
> are called, and the error information of all messages is obtained: 
> Seq[ApiRecordError];
> In the subsequent processRecordErrors(recordErrors) method, currently only 
> special processing is done for Errors.INVALID_TIMESTAMP, because the ERROR 
> returned by validateKey is still the ordinary Errors.INVALID_RECORD, so the 
> code will run to
> {code:java}
> else {
>   throw new RecordValidationException(new InvalidRecordException(
>     "One or more records have been rejected"), errors)
> }{code}
> In fact, the *errors* variable here contains the specific information of each 
> recordError, but we did not put the errors information into the message of 
> InvalidRecordException.
> 2).The exception thrown by processRecordErrors will be caught by 
> ReplicaManager#appendToLocalLog(...), we continue to analyze the 
> `catchException code` of appendToLocalLog.
> Here, we can know the RecordTooLargeException, why the server does not print 
> the log.
> Under case rve: RecordValidationException,
> The server prints the log: processFailedRecord method, 
> and sends a response to the client: LogAppendResult method
> In these two methods, we can find that we only use rve.invalidException,
> For rve.recordErrors, the server neither prints it nor returns it to the 
> client.
> 3.Solution
> Two solutions, I prefer the second
> 1)Similar to Errors.INVALID_TIMESTAMP, the validateKey method returns 
> Errors.INVALID_RECORD_WITHOUT_KEY,
> In the processRecordErrors method, also do special processing for 
> Errors.INVALID_RECORD_WITHOUT_KEY
> 2)Modify the logic of the processRecordErrors method, no longer distinguish 
> the types of Errors, and {*}Even if new INVALID_RECORD types will be added in 
> the future{*}, we uniformly return:
> {code:java}
> throw new RecordValidationException(new InvalidRecordException(
>   "One or more records have been rejected due to " + errors.toString()), 
> errors) {code}
> Also need to add toString() method for ProduceResponse.RecordError class
> {code:java}
> @Override
> public String toString() {
>     return "RecordError("
>             + "batchIndex=" + batchIndex
>             + ", message=" + ((message == null) ? "null" : "'" + message + 
> "'")
>             + ")";
> } {code}
> In the past, the toString method of ProduceResponse.PartitionResponse has 
> called the toString method of ProduceResponse.RecordError, *but before we 
> were missing the RecordError#toString method.*
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to