[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17778603#comment-17778603
 ] 

Divij Vaidya commented on KAFKA-15653:
--------------------------------------

I think I found a quite fundamental problem here.

By design, BufferPool is not thread safe in Kafka [1]. And that is acceptable 
because the buffer pool instance is local to a request handler thread [2]. 
Hence we assume that a particular buffer pool will always be accessed only by 
it's owner thread.

However, unfortunately, seems like that this assumption is not true!

I have same BufferPool being accessed by two different request handler threads. 
The first access is legitimate while trying to process an API request at [3]. 
The second access comes from a change introduced in 
[https://github.com/apache/kafka/commit/56dcb837a2f1c1d8c016cfccf8268a910bb77a36]
 where we are passing the stateful variable of a request (including the buffer 
supplier) to a callback which could be executed by a different request handler 
thread [4] . Hence, we will end up in a situation where stateful members such 
as requestLocal of one thread is being accessed by another different thread. 
This is a bug. This has been present since 3.5 but the impact is more visible 
in 3.6 because in 3.6 we expanded the use of request local buffer pool to 
perform decompression. I am yet to see what other scenarios it can impact but 
anywhere we are using bufferpool might be at risk

[~jolshan] can you please take a look and see if we can avoid leaking the 
requestLocal (bufferpool) belonging to one thread to the other thread? 

One way we can fix this is to store the bufferpool reference in ThreadLocal and 
instead of passing the reference around, whenever we want to use bufferpool, we 
will directly ask the executing thread for it. This will ensure that a thread 
is always using it's own instance of the buffer pool. [~ijuma] since you wrote 
the original requestLocal buffer pool, what do you think about this solution?

[1] 
[https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java#L27]
 

[2] 
[https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L96]
 

[3] 
[https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L154]
 

[4] 
[https://github.com/apache/kafka/blob/526d0f63b5d82f8fb50c97aea9c61f8f85467e92/core/src/main/scala/kafka/server/ReplicaManager.scala#L874]
 



[5] [https://docs.oracle.com/javase/8/docs/api/java/lang/ThreadLocal.html] 

> NPE in ChunkedByteStream
> ------------------------
>
>                 Key: KAFKA-15653
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15653
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 3.6.0
>         Environment: Docker container on a Linux laptop, using the latest 
> release.
>            Reporter: Travis Bischel
>            Assignee: Divij Vaidya
>            Priority: Major
>         Attachments: repro.sh
>
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR 
> from producing. The broker logs for the failing request:
>  
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing 
> append operation on partition 
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
>       at 
> org.apache.kafka.common.utils.ChunkedBytesStream.<init>(ChunkedBytesStream.java:89)
>       at 
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
>       at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>       at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>       at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>       at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>       at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>       at kafka.log.UnifiedLog.append(UnifiedLog.scala:805)
>       at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>       at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>       at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>       at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>       at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>       at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>       at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>       at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>       at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754)
>       at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874)
>       at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874)
>       at 
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
>       at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to