[ https://issues.apache.org/jira/browse/KAFKA-13694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17498734#comment-17498734 ]
RivenSun edited comment on KAFKA-13694 at 2/28/22, 7:08 AM: ------------------------------------------------------------ Hi [~guozhang] If we simply change to call `processFailedRecord(topicPartition, rve)`, the actual effect will be worse. 1) Still won't print specific recordErrors. 2) Specific stack information will be printed, resulting in performance loss. {code:java} [2022-02-28 14:43:56,404] ERROR [ReplicaManager broker=0] Error processing append operation on partition rivenTest4-0 (kafka.server.ReplicaManager) kafka.common.RecordValidationException: org.apache.kafka.common.InvalidRecordException: One or more records have been rejected at kafka.log.LogValidator$.processRecordErrors(LogValidator.scala:579) at kafka.log.LogValidator$.$anonfun$assignOffsetsNonCompressed$1(LogValidator.scala:313) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at kafka.log.LogValidator$.assignOffsetsNonCompressed(LogValidator.scala:290) at kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:108) at kafka.log.UnifiedLog.append(UnifiedLog.scala:807) at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:723) at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1059) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1047) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:930) at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) at scala.collection.mutable.HashMap.map(HashMap.scala:35) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:918) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:589) at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:661) at kafka.server.KafkaApis.handle(KafkaApis.scala:172) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.common.InvalidRecordException: One or more records have been rejected {code} So I suggest to modify the *LogValidator#processRecordErrors* method to remove the judgment of Errors type and {*}Even if new INVALID_RECORD types will be added in the future{*}, we uniformly return. In fact, the server does not print specific recordErrors {*}for Errors.INVALID_TIMESTAMP{*}; 1) processRecordErrors method {code:java} private def processRecordErrors(recordErrors: Seq[ApiRecordError]): Unit = { if (recordErrors.nonEmpty) { val errors = recordErrors.map(_.recordError) throw new RecordValidationException(new InvalidRecordException( "One or more records have been rejected due to " + errors), errors) } } {code} 2) As mentioned at the end of the JIRA description, we need to add toString() method for ProduceResponse.RecordError class. Modified result: {code:java} [2022-02-28 14:53:23,418] ERROR [ReplicaManager broker=0] Error processing append operation on partition rivenTest4-0 (kafka.server.ReplicaManager) org.apache.kafka.common.InvalidRecordException: One or more records have been rejected due to ArrayBuffer(RecordError(batchIndex=0, message='Compacted topic cannot accept message without key in topic partition rivenTest4-0.')) [2022-02-28 14:53:38,316] ERROR [ReplicaManager broker=0] Error processing append operation on partition rivenTest4-0 (kafka.server.ReplicaManager) org.apache.kafka.common.InvalidRecordException: One or more records have been rejected due to ArrayBuffer(RecordError(batchIndex=0, message='Compacted topic cannot accept message without key in topic partition rivenTest4-0.')) {code} WDYT? Thanks. was (Author: rivensun): Hi [~guozhang] If we simply change to call `processFailedRecord(topicPartition, rve)`, the actual effect will be worse. 1) Still won't print specific recordErrors. 2) Specific stack information will be printed, resulting in performance loss. {code:java} [2022-02-28 14:43:56,404] ERROR [ReplicaManager broker=0] Error processing append operation on partition rivenTest4-0 (kafka.server.ReplicaManager) kafka.common.RecordValidationException: org.apache.kafka.common.InvalidRecordException: One or more records have been rejected at kafka.log.LogValidator$.processRecordErrors(LogValidator.scala:579) at kafka.log.LogValidator$.$anonfun$assignOffsetsNonCompressed$1(LogValidator.scala:313) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at kafka.log.LogValidator$.assignOffsetsNonCompressed(LogValidator.scala:290) at kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:108) at kafka.log.UnifiedLog.append(UnifiedLog.scala:807) at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:723) at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1059) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1047) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:930) at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) at scala.collection.mutable.HashMap.map(HashMap.scala:35) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:918) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:589) at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:661) at kafka.server.KafkaApis.handle(KafkaApis.scala:172) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.common.InvalidRecordException: One or more records have been rejected {code} So I suggest to modify the *LogValidator#processRecordErrors* method to remove the judgment of Errors type. In fact, the server does not print specific recordErrors {*}for Errors.INVALID_TIMESTAMP{*}; and {*}Even if new INVALID_RECORD types will be added in the future{*}, we uniformly return 1) processRecordErrors method {code:java} private def processRecordErrors(recordErrors: Seq[ApiRecordError]): Unit = { if (recordErrors.nonEmpty) { val errors = recordErrors.map(_.recordError) throw new RecordValidationException(new InvalidRecordException( "One or more records have been rejected due to " + errors), errors) } } {code} 2) As mentioned at the end of the JIRA description, we need to add toString() method for ProduceResponse.RecordError class. Modified result: {code:java} [2022-02-28 14:53:23,418] ERROR [ReplicaManager broker=0] Error processing append operation on partition rivenTest4-0 (kafka.server.ReplicaManager) org.apache.kafka.common.InvalidRecordException: One or more records have been rejected due to ArrayBuffer(RecordError(batchIndex=0, message='Compacted topic cannot accept message without key in topic partition rivenTest4-0.')) [2022-02-28 14:53:38,316] ERROR [ReplicaManager broker=0] Error processing append operation on partition rivenTest4-0 (kafka.server.ReplicaManager) org.apache.kafka.common.InvalidRecordException: One or more records have been rejected due to ArrayBuffer(RecordError(batchIndex=0, message='Compacted topic cannot accept message without key in topic partition rivenTest4-0.')) {code} WDYT? Thanks. > Some InvalidRecordException messages are thrown away > ---------------------------------------------------- > > Key: KAFKA-13694 > URL: https://issues.apache.org/jira/browse/KAFKA-13694 > Project: Kafka > Issue Type: Improvement > Components: clients, core > Affects Versions: 3.0.0 > Reporter: RivenSun > Priority: Major > > 1.Example > Topic level config:"cleanup.policy":"compact" > But when the producer sends the message, the ProducerRecord does not specify > the key. > > producer.log > {code:java} > [kafka-producer-network-thread | producer-1] ERROR > us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One > or more records have been rejected {code} > > > server.log > {code:java} > [2022-02-25 02:14:54,411] ERROR [ReplicaManager broker=1] Error processing > append operation on partition rivenTest4-0 (kafka.server.ReplicaManager) > org.apache.kafka.common.InvalidRecordException: One or more records have been > rejected {code} > Through the logs of the producer and server, we do not know the reason for > the failure of sending, only that the message was rejected by the server. > You can compare the RecordTooLargeException testCase, we can clearly know the > reason for the failure from the producer, and the server will not print the > log (the reason will be explained later) > producer_message_too_large.log : > {code:java} > [kafka-producer-network-thread | producer-1] ERROR > us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The > request included a message larger than the max message size the server will > accept. > [kafka-producer-network-thread | producer-1] ERROR > us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The > request included a message larger than the max message size the server will > accept. {code} > 2.RootCause > ReplicaManager#appendToLocalLog(...) -> > Partition#appendRecordsToLeader(...) -> > UnifiedLog#appendAsLeader(...) -> UnifiedLog#append(...) -> > LogValidator#validateMessagesAndAssignOffsets(...) > 1) Analyze the validateMessagesAndAssignOffsets method, > In the LogValidator#validateRecord method, validateKey and validateTimestamp > are called, and the error information of all messages is obtained: > Seq[ApiRecordError]; > In the subsequent processRecordErrors(recordErrors) method, currently only > special processing is done for Errors.INVALID_TIMESTAMP, because the ERROR > returned by validateKey is still the ordinary Errors.INVALID_RECORD, so the > code will run to > {code:java} > else { > throw new RecordValidationException(new InvalidRecordException( > "One or more records have been rejected"), errors) > }{code} > In fact, the *errors* variable here contains the specific information of each > recordError, but we did not put the errors information into the message of > InvalidRecordException. > 2).The exception thrown by processRecordErrors will be caught by > ReplicaManager#appendToLocalLog(...), we continue to analyze the > `catchException code` of appendToLocalLog. > Here, we can know the RecordTooLargeException, why the server does not print > the log. > Under case rve: RecordValidationException, > The server prints the log: processFailedRecord method, > and sends a response to the client: LogAppendResult method > In these two methods, we can find that we only use rve.invalidException, > For rve.recordErrors, the server neither prints it nor returns it to the > client. > 3.Solution > Two solutions, I prefer the second > 1)Similar to Errors.INVALID_TIMESTAMP, the validateKey method returns > Errors.INVALID_RECORD_WITHOUT_KEY, > In the processRecordErrors method, also do special processing for > Errors.INVALID_RECORD_WITHOUT_KEY > 2)Modify the logic of the processRecordErrors method, no longer distinguish > the types of Errors, and {*}Even if new INVALID_RECORD types will be added in > the future{*}, we uniformly return: > {code:java} > throw new RecordValidationException(new InvalidRecordException( > "One or more records have been rejected due to " + errors.toString()), > errors) {code} > Also need to add toString() method for ProduceResponse.RecordError class > {code:java} > @Override > public String toString() { > return "RecordError(" > + "batchIndex=" + batchIndex > + ", message=" + ((message == null) ? "null" : "'" + message + > "'") > + ")"; > } {code} > In the past, the toString method of ProduceResponse.PartitionResponse has > called the toString method of ProduceResponse.RecordError, *but before we > were missing the RecordError#toString method.* > -- This message was sent by Atlassian Jira (v8.20.1#820001)