Vinicius Vieira dos Santos created KAFKA-18471:
--------------------------------------------------
Summary: Race conditions when accessing RecordHeader data
Key: KAFKA-18471
URL: https://issues.apache.org/jira/browse/KAFKA-18471
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 3.8.1
Reporter: Vinicius Vieira dos Santos
There is a race condition in the {{RecordHeader}} class of Kafka when an
instance is created using the [[constructor with
ByteBuffer|https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L38]{{{}{}}}|https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L38].
In this scenario, when attempting to access the {{key}} or {{{}value{}}}, a
process copies the {{ByteBuffer}} into a byte array.
During this process, multiple threads may simultaneously invoke the method
responsible for the copying. This can lead to a situation where one thread
successfully completes the operation, while another abruptly has the buffer set
to {{null}} during the process.
Exception example:
{code:java}
Exception in thread "pool-1-thread-3" java.lang.NullPointerException: Cannot
invoke "java.nio.ByteBuffer.remaining()" because "this.keyBuffer" is null
at
org.apache.kafka.common.header.internals.RecordHeader.key(RecordHeader.java:45)
at
br.com.autbank.workflow.TestMainExample.lambda$main$0(TestMainExample.java:36)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at
br.com.autbank.workflow.TestMainExample.lambda$main$1(TestMainExample.java:32)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583) {code}
Code example for error:
{code:java}
public class TestMainExample {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 200_000; i++) {
Charset charset = StandardCharsets.UTF_8;
RecordHeaders headers = new RecordHeaders();
headers.add(new RecordHeader(charset.encode("header-key-1"),
charset.encode("value-1")));
headers.add(new RecordHeader(charset.encode("header-key-2"),
charset.encode("value-2")));
headers.add(new RecordHeader(charset.encode("header-key-3"),
charset.encode("2025-01-06T00:00:00.000000000-00:00[UTC]")));
headers.add(new RecordHeader(charset.encode("header-key-4"),
charset.encode("2025-01-06T00:00:00.000000000-00:00[UTC]")));
headers.add(new RecordHeader(charset.encode("header-key-5"),
charset.encode("account-number")));
headers.add(new RecordHeader(charset.encode("header-key-6"),
charset.encode("operation-id")));
headers.add(new RecordHeader(charset.encode("header-key-7"),
charset.encode("agency-code")));
headers.add(new RecordHeader(charset.encode("header-key-8"),
charset.encode("branch-code")));
CountDownLatch count = new CountDownLatch(5);
for (int j = 0; j < 5; j++) {
executorService.execute(() -> {
try {
headers.forEach((hdr) -> {
if (hdr.value() == null) {
throw new IllegalStateException("Bug find on
value");
}
if (hdr.key() == null) {
throw new IllegalStateException("Bug find on
key");
}
});
} finally {
count.countDown();
}
});
}
count.await();
}
}
} {code}
I did a test synchronizing the method I use to access the headers and this
resolved the problem in the context of my application, but I believe the ideal
would be to either mark that the class is not thread safe or synchronize access
to the bytebuffer. Thank you in advance to the team.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)