Antonio Tomac created KAFKA-12999: ------------------------------------- Summary: NPE when accessing RecordHeader.key() concurrently Key: KAFKA-12999 URL: https://issues.apache.org/jira/browse/KAFKA-12999 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.8.0 Reporter: Antonio Tomac
h2. Summary After upgrading clients to {{2.8.0}}, reading {{ConsumerRecord}}'s header keys started resulting in occasional {{java.lang.NullPointerException}} in case of concurrent access from multiple(2) threads. h2. Where NPE happens here [RecordHeader.java:45|https://github.com/apache/kafka/blob/2.8.0/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L45]: {code:java} public String key() { if (key == null) { key = Utils.utf8(keyBuffer, keyBuffer.remaining()); // NPE here keyBuffer = null; } return key; } {code} h2. When/why Cause of issue is introduced by changes of KAFKA-10438 to avoid unnecessary creation of key's *{color:#0747a6}{{String}}{color}* when it might never be used. It is good optimization but this *lazy* initialization of field {{RecordHeader.key}} creates a problem if being accessed/initialized by 2 threads concurrently since it's now no longer read-only operation and there is race between initializing {color:#0747a6}*{{key}}*{color} and nullifying {color:#0747a6}*{{keyBuffer}}*{color} h2. Simple workaround Upon consuming record(s) and before passing {color:#0747a6}*{{ConsumerRecord}}*{color} to multiple processing threads, eagerly initialize all header keys by iterating through headers and invoking {color:#0747a6}*{{key()}}*{color} or even {color:#0747a6}*{{ConsumerRecord.headers().hashCode()}}*{color} which will initialize all keys (and header values too) h2. Consequences Current implementation renders RecordHeader not thread-safe for read-only access. h2. Reproducibility With enough iterations it's always possible to reproduce (at least on my local) Here is minimal snippet to reproduce: {code:java} @Test public void testConcurrentKeyInit() throws ExecutionException, InterruptedException { ByteBuffer keyBuffer = ByteBuffer.wrap("key".getBytes(StandardCharsets.UTF_8)); ByteBuffer valueBuffer = ByteBuffer.wrap("value".getBytes(StandardCharsets.UTF_8)); ExecutorService executorService = Executors.newSingleThreadExecutor(); try { for (int i = 0; i < 1_000_000; i++) { RecordHeader header = new RecordHeader(keyBuffer, valueBuffer); Future<String> future = executorService.submit(header::key); assertEquals("key", header.key()); assertEquals("key", future.get()); } } finally { executorService.shutdown(); } } {code} h2. Possible solution #1 Leave implementation as-is but somehow document this to users. h2. Possible solution #2 Add some concurrency primitives to current implementation * simply adding {color:#0747a6}*{{synchronized}}*{color} on method *{color:#0747a6}{{key()}}{color}* (and on *{color:#0747a6}{{value()}}{color}* too) gives correct behaviour avoiding race-conditions. * JMH benchmark comparing *{color:#0747a6}{{key()}}{color}* with and without {color:#0747a6}*{{synchronized}}*{color} showed no significant performance penalty {code} Benchmark Mode Cnt Score Error Units RecordHeaderBenchmark.key avgt 15 31.308 ± 7.862 ns/op RecordHeaderBenchmark.synchronizedKey avgt 15 31.853 ± 7.096 ns/op {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)