[ 
https://issues.apache.org/jira/browse/KAFKA-12999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17370152#comment-17370152
 ] 

Ismael Juma commented on KAFKA-12999:
-------------------------------------

Note that biased locking has been deprecated and will be removed in JDK 18 or 
soon after. So, synchronized blocks have a cost. We should do what I suggested 
and have the synchronized _only_ during initialization.

> NPE when accessing RecordHeader.key() concurrently
> --------------------------------------------------
>
>                 Key: KAFKA-12999
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12999
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.8.0
>            Reporter: Antonio Tomac
>            Priority: Minor
>
> h2. Summary
> After upgrading clients to {{2.8.0}}, reading {{ConsumerRecord}}'s header 
> keys started resulting in occasional {{java.lang.NullPointerException}} in 
> case of concurrent access from multiple(2) threads.
> h2. Where
> NPE happens here 
> [RecordHeader.java:45|https://github.com/apache/kafka/blob/2.8.0/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L45]:
> {code:java}
> public String key() {
>     if (key == null) {
>         key = Utils.utf8(keyBuffer, keyBuffer.remaining()); // NPE here 
>         keyBuffer = null;
>     }
>     return key;
> }
> {code}
> h2. When/why
> Cause of issue is introduced by changes of KAFKA-10438 to avoid unnecessary 
> creation of key's *{color:#0747a6}{{String}}{color}* when it might never be 
> used.
>  It is good optimization but this *lazy* initialization of field 
> {{RecordHeader.key}} creates a problem if being accessed/initialized by 2 
> threads concurrently since it's now no longer read-only operation and there 
> is race between initializing {color:#0747a6}*{{key}}*{color} and nullifying 
> {color:#0747a6}*{{keyBuffer}}*{color}
> h2. Simple workaround
> Upon consuming record(s) and before passing 
> {color:#0747a6}*{{ConsumerRecord}}*{color} to multiple processing threads, 
> eagerly initialize all header keys by iterating through headers and invoking 
> {color:#0747a6}*{{key()}}*{color} or even 
> {color:#0747a6}*{{ConsumerRecord.headers().hashCode()}}*{color} which will 
> initialize all keys (and header values too)
> h2. Consequences
> Current implementation renders RecordHeader not thread-safe for read-only 
> access.
> h2. Reproducibility
> With enough iterations it's always possible to reproduce (at least on my 
> local)
>  Here is minimal snippet to reproduce:
> {code:java}
> @Test
> public void testConcurrentKeyInit() throws ExecutionException, 
> InterruptedException {
>     ByteBuffer keyBuffer = 
> ByteBuffer.wrap("key".getBytes(StandardCharsets.UTF_8));
>     ByteBuffer valueBuffer = 
> ByteBuffer.wrap("value".getBytes(StandardCharsets.UTF_8));
>     ExecutorService executorService = Executors.newSingleThreadExecutor();
>     try {
>         for (int i = 0; i < 1_000_000; i++) {
>             RecordHeader header = new RecordHeader(keyBuffer, valueBuffer);
>             Future<String> future = executorService.submit(header::key);
>             assertEquals("key", header.key());
>             assertEquals("key", future.get());
>         }
>     } finally {
>         executorService.shutdown();
>     }
> }
> {code}
> h2. Possible solution #1
> Leave implementation as-is but somehow document this to users.
> h2. Possible solution #2
> Add some concurrency primitives to current implementation
> *   simply adding {color:#0747a6}*{{synchronized}}*{color} on method 
> *{color:#0747a6}{{key()}}{color}* (and on *{color:#0747a6}{{value()}}{color}* 
> too) gives correct behaviour avoiding race-conditions. 
> * JMH benchmark comparing *{color:#0747a6}{{key()}}{color}* with and without 
> {color:#0747a6}*{{synchronized}}*{color} showed no significant performance 
> penalty
> {code}
> Benchmark                              Mode  Cnt   Score   Error  Units
> RecordHeaderBenchmark.key              avgt    15  31.308 ± 7.862  ns/op
> RecordHeaderBenchmark.synchronizedKey  avgt    15  31.853 ± 7.096  ns/op
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to