[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17777647#comment-17777647 ]
Divij Vaidya commented on KAFKA-15653: -------------------------------------- In 3.6, we started using Buffer pool local to each request handler thread to perform decompression. The above stack trace indicates a null when we ask for a buffer from the buffer pool, returned by bufferQueue.pollfirst() at [https://github.com/apache/kafka/blob/c81a7252195261f649faba166ee723552bed4d81/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java#L76] Ideally that should not be possible bufferQueue.pollfirst() returns null only when bufferQueue is empty. And we already check fr bufferQueue being empty in the line above. But this function is not thread safe. It doesn't need to be because a particular instance of buffer pool (DefaultSupplier) is associated with single thread (the request handler thread) and only one thread should be accessing it at one time. I will try to eyeball to code here to see if I can find something. But practically [~twmb] it would be greatly useful if you can share your integration/unit test with us so that we can find a deterministic way to reproduce it. > NPE in ChunkedByteStream > ------------------------ > > Key: KAFKA-15653 > URL: https://issues.apache.org/jira/browse/KAFKA-15653 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 3.6.0 > Environment: Docker container on a Linux laptop, using the latest > release. > Reporter: Travis Bischel > Assignee: Divij Vaidya > Priority: Major > > When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR > from producing. The broker logs for the failing request: > > {noformat} > [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing > append operation on partition > 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 > (kafka.server.ReplicaManager) > java.lang.NullPointerException > at > org.apache.kafka.common.utils.ChunkedBytesStream.<init>(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:805) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) > at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874) > at > kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130) > at java.base/java.lang.Thread.run(Unknown Source) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)