[ https://issues.apache.org/jira/browse/KAFKA-12999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17370142#comment-17370142 ]
Chia-Ping Tsai commented on KAFKA-12999: ---------------------------------------- Thanks for this ticket. As juma explained, the class is not designed for thread-safe. As modern Java make optimization for single thread in sync block, adding sync to make it thread-safe seems to be fine to me. Of course, it needs KIP :) > NPE when accessing RecordHeader.key() concurrently > -------------------------------------------------- > > Key: KAFKA-12999 > URL: https://issues.apache.org/jira/browse/KAFKA-12999 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 2.8.0 > Reporter: Antonio Tomac > Priority: Minor > > h2. Summary > After upgrading clients to {{2.8.0}}, reading {{ConsumerRecord}}'s header > keys started resulting in occasional {{java.lang.NullPointerException}} in > case of concurrent access from multiple(2) threads. > h2. Where > NPE happens here > [RecordHeader.java:45|https://github.com/apache/kafka/blob/2.8.0/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L45]: > {code:java} > public String key() { > if (key == null) { > key = Utils.utf8(keyBuffer, keyBuffer.remaining()); // NPE here > keyBuffer = null; > } > return key; > } > {code} > h2. When/why > Cause of issue is introduced by changes of KAFKA-10438 to avoid unnecessary > creation of key's *{color:#0747a6}{{String}}{color}* when it might never be > used. > It is good optimization but this *lazy* initialization of field > {{RecordHeader.key}} creates a problem if being accessed/initialized by 2 > threads concurrently since it's now no longer read-only operation and there > is race between initializing {color:#0747a6}*{{key}}*{color} and nullifying > {color:#0747a6}*{{keyBuffer}}*{color} > h2. Simple workaround > Upon consuming record(s) and before passing > {color:#0747a6}*{{ConsumerRecord}}*{color} to multiple processing threads, > eagerly initialize all header keys by iterating through headers and invoking > {color:#0747a6}*{{key()}}*{color} or even > {color:#0747a6}*{{ConsumerRecord.headers().hashCode()}}*{color} which will > initialize all keys (and header values too) > h2. Consequences > Current implementation renders RecordHeader not thread-safe for read-only > access. > h2. Reproducibility > With enough iterations it's always possible to reproduce (at least on my > local) > Here is minimal snippet to reproduce: > {code:java} > @Test > public void testConcurrentKeyInit() throws ExecutionException, > InterruptedException { > ByteBuffer keyBuffer = > ByteBuffer.wrap("key".getBytes(StandardCharsets.UTF_8)); > ByteBuffer valueBuffer = > ByteBuffer.wrap("value".getBytes(StandardCharsets.UTF_8)); > ExecutorService executorService = Executors.newSingleThreadExecutor(); > try { > for (int i = 0; i < 1_000_000; i++) { > RecordHeader header = new RecordHeader(keyBuffer, valueBuffer); > Future<String> future = executorService.submit(header::key); > assertEquals("key", header.key()); > assertEquals("key", future.get()); > } > } finally { > executorService.shutdown(); > } > } > {code} > h2. Possible solution #1 > Leave implementation as-is but somehow document this to users. > h2. Possible solution #2 > Add some concurrency primitives to current implementation > * simply adding {color:#0747a6}*{{synchronized}}*{color} on method > *{color:#0747a6}{{key()}}{color}* (and on *{color:#0747a6}{{value()}}{color}* > too) gives correct behaviour avoiding race-conditions. > * JMH benchmark comparing *{color:#0747a6}{{key()}}{color}* with and without > {color:#0747a6}*{{synchronized}}*{color} showed no significant performance > penalty > {code} > Benchmark Mode Cnt Score Error Units > RecordHeaderBenchmark.key avgt 15 31.308 ± 7.862 ns/op > RecordHeaderBenchmark.synchronizedKey avgt 15 31.853 ± 7.096 ns/op > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)