[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346626#comment-16346626
 ] 

ASF GitHub Bot commented on KAFKA-6378:
---------------------------------------

andybryant opened a new pull request #4494: KAFKA-6378 KStream-GlobalKTable 
null KeyValueMapper handling
URL: https://github.com/apache/kafka/pull/4494
 
 
   For KStream-GlobalKTable joins let `null` `KeyValueMapper` results indicate 
no match
   
   For KStream-GlobalKTable joins, a `KeyValueMapper` is used to derive a key 
from the stream records into the `GlobalKTable`. For some stream values there 
may be no valid reference to the table stream. This patch allows developers to 
use `null` return values to indicate there is no possible match. This is 
possible in this case since `null` is never a valid key value for a 
`GlobalKTable`.
   Without this patch, providing a `null` value caused the stream to crash on 
Kafka 1.0.
   
   I added unit tests for KStream-GlobalKTable left and inner joins, since they 
were missing. I also covered this additional scenario where `KeyValueMapper` 
returns `null` to insure it is handled correctly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6378
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6378
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Andy Bryant
>            Priority: Major
>             Fix For: 1.0.1
>
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>       at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>       at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>       at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>       at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>       at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>       at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to