[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17777647#comment-17777647
 ] 

Divij Vaidya commented on KAFKA-15653:
--------------------------------------

In 3.6, we started using Buffer pool local to each request handler thread to 
perform decompression. The above stack trace indicates a null when we ask for a 
buffer from the buffer pool, returned by bufferQueue.pollfirst() at 
[https://github.com/apache/kafka/blob/c81a7252195261f649faba166ee723552bed4d81/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java#L76]
 

Ideally that should not be possible bufferQueue.pollfirst() returns null only 
when bufferQueue is empty. And we already check fr bufferQueue being empty in 
the line above. But this function is not thread safe. It doesn't need to be 
because a particular instance of buffer pool (DefaultSupplier) is associated 
with single thread (the request handler thread) and only one thread should be 
accessing it at one time. 

I will try to eyeball to code here to see if I can find something. But 
practically [~twmb] it would be greatly useful if you can share your 
integration/unit test with us so that we can find a deterministic way to 
reproduce it.

> NPE in ChunkedByteStream
> ------------------------
>
>                 Key: KAFKA-15653
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15653
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 3.6.0
>         Environment: Docker container on a Linux laptop, using the latest 
> release.
>            Reporter: Travis Bischel
>            Assignee: Divij Vaidya
>            Priority: Major
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR 
> from producing. The broker logs for the failing request:
>  
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing 
> append operation on partition 
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
>       at 
> org.apache.kafka.common.utils.ChunkedBytesStream.<init>(ChunkedBytesStream.java:89)
>       at 
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
>       at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>       at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>       at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>       at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>       at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>       at kafka.log.UnifiedLog.append(UnifiedLog.scala:805)
>       at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>       at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>       at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>       at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>       at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>       at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>       at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>       at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>       at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754)
>       at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874)
>       at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874)
>       at 
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
>       at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to