[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17778603#comment-17778603
 ] 

Divij Vaidya edited comment on KAFKA-15653 at 10/23/23 10:33 AM:
-----------------------------------------------------------------

I think I found a quite fundamental problem here.

By design, BufferPool is not thread safe in Kafka [1]. And that is acceptable 
because the buffer pool instance is local to a request handler thread [2]. 
Hence we assume that a particular buffer pool will always be accessed only by 
it's owner thread.

However, unfortunately, seems like that this assumption is not true!

I have same BufferPool being accessed by two different request handler threads. 
The first access is legitimate while trying to process an API request at [3]. 
The second access comes from a change introduced in 
[https://github.com/apache/kafka/commit/56dcb837a2f1c1d8c016cfccf8268a910bb77a36]
 where we are passing the stateful variable of a request (including the buffer 
supplier) to a callback which could be executed by a different request handler 
thread [4] . Hence, we will end up in a situation where stateful members such 
as requestLocal of one thread is being accessed by another different thread. 
This is a bug.

*Impact of the bug*

This bug has been present since 3.5 but the impact is visible in 3.6 because in 
3.6 we expanded the use of request local buffer pool to perform decompression. 
Earlier the buffer pool was only being used in read path and by log cleaner. 
The callback mentioned above calls appendToLocalLog and prior to 3.6 this code 
path wasn't using requestLocal.

All 3.6 operations which call the following line in ReplicaManager are impacted 
which is basically use case of transactions and also specifically to 
AddPartitionsToTxnManager API calls.

```
addPartitionsToTxnManager.foreach({_}.addTxnData(node, 
notYetVerifiedTransaction, 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)({_}))))
```

*Possible solutions*
[~jolshan] can you please take a look and see if we can avoid leaking the 
requestLocal (bufferpool) belonging to one thread to the other thread?

One way we can fix this is to store the bufferpool reference in ThreadLocal and 
instead of passing the reference around, whenever we want to use bufferpool, we 
will directly ask the executing thread for it. This will ensure that a thread 
is always using it's own instance of the buffer pool. [~ijuma] since you wrote 
the original requestLocal buffer pool, what do you think about this solution?

Another option is to extract out function `appendEntries` outside of 
`appendRecords` since it is `appendEntries` is invoked as a callback and 
shouldn't rely on local variable/state of `appendRecords`.



[1] 
[https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java#L27]
 

[2] 
[https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L96]
 

[3] 
[https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L154]
 

[4] 
[https://github.com/apache/kafka/blob/526d0f63b5d82f8fb50c97aea9c61f8f85467e92/core/src/main/scala/kafka/server/ReplicaManager.scala#L874]
 

[5] [https://docs.oracle.com/javase/8/docs/api/java/lang/ThreadLocal.html] 


was (Author: divijvaidya):
I think I found a quite fundamental problem here.

By design, BufferPool is not thread safe in Kafka [1]. And that is acceptable 
because the buffer pool instance is local to a request handler thread [2]. 
Hence we assume that a particular buffer pool will always be accessed only by 
it's owner thread.

However, unfortunately, seems like that this assumption is not true!

I have same BufferPool being accessed by two different request handler threads. 
The first access is legitimate while trying to process an API request at [3]. 
The second access comes from a change introduced in 
[https://github.com/apache/kafka/commit/56dcb837a2f1c1d8c016cfccf8268a910bb77a36]
 where we are passing the stateful variable of a request (including the buffer 
supplier) to a callback which could be executed by a different request handler 
thread [4] . Hence, we will end up in a situation where stateful members such 
as requestLocal of one thread is being accessed by another different thread. 
This is a bug.

*Impact of the bug*

This bug has been present since 3.5 but the impact is visible in 3.6 because in 
3.6 we expanded the use of request local buffer pool to perform decompression. 
Earlier the buffer pool was only being used in read path and by log cleaner. 
The callback mentioned above calls appendToLocalLog and prior to 3.6 this code 
path wasn't using requestLocal.

All 3.6 operations which call the following line in ReplicaManager are impacted 
which is basically use case of transactions and also specifically to 
AddPartitionsToTxnManager API calls.

```
addPartitionsToTxnManager.foreach(_.addTxnData(node, notYetVerifiedTransaction, 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_))))
```

*Possible solutions*
[~jolshan] can you please take a look and see if we can avoid leaking the 
requestLocal (bufferpool) belonging to one thread to the other thread?

One way we can fix this is to store the bufferpool reference in ThreadLocal and 
instead of passing the reference around, whenever we want to use bufferpool, we 
will directly ask the executing thread for it. This will ensure that a thread 
is always using it's own instance of the buffer pool. [~ijuma] since you wrote 
the original requestLocal buffer pool, what do you think about this solution?

[1] 
[https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java#L27]
 

[2] 
[https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L96]
 

[3] 
[https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L154]
 

[4] 
[https://github.com/apache/kafka/blob/526d0f63b5d82f8fb50c97aea9c61f8f85467e92/core/src/main/scala/kafka/server/ReplicaManager.scala#L874]
 

[5] [https://docs.oracle.com/javase/8/docs/api/java/lang/ThreadLocal.html] 

> NPE in ChunkedByteStream
> ------------------------
>
>                 Key: KAFKA-15653
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15653
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 3.6.0
>         Environment: Docker container on a Linux laptop, using the latest 
> release.
>            Reporter: Travis Bischel
>            Assignee: Divij Vaidya
>            Priority: Major
>         Attachments: repro.sh
>
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR 
> from producing. The broker logs for the failing request:
>  
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing 
> append operation on partition 
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
>       at 
> org.apache.kafka.common.utils.ChunkedBytesStream.<init>(ChunkedBytesStream.java:89)
>       at 
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
>       at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>       at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>       at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>       at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>       at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>       at kafka.log.UnifiedLog.append(UnifiedLog.scala:805)
>       at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>       at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>       at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>       at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>       at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>       at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>       at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>       at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>       at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754)
>       at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874)
>       at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874)
>       at 
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
>       at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to