[ 
https://issues.apache.org/jira/browse/KAFKA-12999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17370136#comment-17370136
 ] 

Ismael Juma commented on KAFKA-12999:
-------------------------------------

Thanks for the ticket. This class was never meant to be thread safe, the value 
was initialized lazily previously and it just so happened the key was not. 
Since the consumer is single threaded, this was deemed ok.

For now, the easiest path is for you to copy the data you care about to your 
own thread safe class. We could consider making ConsumerRecord thread safe, but 
it would require a KIP and it would probably only be done in a feature release.

> NPE when accessing RecordHeader.key() concurrently
> --------------------------------------------------
>
>                 Key: KAFKA-12999
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12999
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.8.0
>            Reporter: Antonio Tomac
>            Priority: Minor
>
> h2. Summary
> After upgrading clients to {{2.8.0}}, reading {{ConsumerRecord}}'s header 
> keys started resulting in occasional {{java.lang.NullPointerException}} in 
> case of concurrent access from multiple(2) threads.
> h2. Where
> NPE happens here 
> [RecordHeader.java:45|https://github.com/apache/kafka/blob/2.8.0/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L45]:
> {code:java}
> public String key() {
>     if (key == null) {
>         key = Utils.utf8(keyBuffer, keyBuffer.remaining()); // NPE here 
>         keyBuffer = null;
>     }
>     return key;
> }
> {code}
> h2. When/why
> Cause of issue is introduced by changes of KAFKA-10438 to avoid unnecessary 
> creation of key's *{color:#0747a6}{{String}}{color}* when it might never be 
> used.
>  It is good optimization but this *lazy* initialization of field 
> {{RecordHeader.key}} creates a problem if being accessed/initialized by 2 
> threads concurrently since it's now no longer read-only operation and there 
> is race between initializing {color:#0747a6}*{{key}}*{color} and nullifying 
> {color:#0747a6}*{{keyBuffer}}*{color}
> h2. Simple workaround
> Upon consuming record(s) and before passing 
> {color:#0747a6}*{{ConsumerRecord}}*{color} to multiple processing threads, 
> eagerly initialize all header keys by iterating through headers and invoking 
> {color:#0747a6}*{{key()}}*{color} or even 
> {color:#0747a6}*{{ConsumerRecord.headers().hashCode()}}*{color} which will 
> initialize all keys (and header values too)
> h2. Consequences
> Current implementation renders RecordHeader not thread-safe for read-only 
> access.
> h2. Reproducibility
> With enough iterations it's always possible to reproduce (at least on my 
> local)
>  Here is minimal snippet to reproduce:
> {code:java}
> @Test
> public void testConcurrentKeyInit() throws ExecutionException, 
> InterruptedException {
>     ByteBuffer keyBuffer = 
> ByteBuffer.wrap("key".getBytes(StandardCharsets.UTF_8));
>     ByteBuffer valueBuffer = 
> ByteBuffer.wrap("value".getBytes(StandardCharsets.UTF_8));
>     ExecutorService executorService = Executors.newSingleThreadExecutor();
>     try {
>         for (int i = 0; i < 1_000_000; i++) {
>             RecordHeader header = new RecordHeader(keyBuffer, valueBuffer);
>             Future<String> future = executorService.submit(header::key);
>             assertEquals("key", header.key());
>             assertEquals("key", future.get());
>         }
>     } finally {
>         executorService.shutdown();
>     }
> }
> {code}
> h2. Possible solution #1
> Leave implementation as-is but somehow document this to users.
> h2. Possible solution #2
> Add some concurrency primitives to current implementation
> *   simply adding {color:#0747a6}*{{synchronized}}*{color} on method 
> *{color:#0747a6}{{key()}}{color}* (and on *{color:#0747a6}{{value()}}{color}* 
> too) gives correct behaviour avoiding race-conditions. 
> * JMH benchmark comparing *{color:#0747a6}{{key()}}{color}* with and without 
> {color:#0747a6}*{{synchronized}}*{color} showed no significant performance 
> penalty
> {code}
> Benchmark                              Mode  Cnt   Score   Error  Units
> RecordHeaderBenchmark.key              avgt    15  31.308 ± 7.862  ns/op
> RecordHeaderBenchmark.synchronizedKey  avgt    15  31.853 ± 7.096  ns/op
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to