[ https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346626#comment-16346626 ]
ASF GitHub Bot commented on KAFKA-6378: --------------------------------------- andybryant opened a new pull request #4494: KAFKA-6378 KStream-GlobalKTable null KeyValueMapper handling URL: https://github.com/apache/kafka/pull/4494 For KStream-GlobalKTable joins let `null` `KeyValueMapper` results indicate no match For KStream-GlobalKTable joins, a `KeyValueMapper` is used to derive a key from the stream records into the `GlobalKTable`. For some stream values there may be no valid reference to the table stream. This patch allows developers to use `null` return values to indicate there is no possible match. This is possible in this case since `null` is never a valid key value for a `GlobalKTable`. Without this patch, providing a `null` value caused the stream to crash on Kafka 1.0. I added unit tests for KStream-GlobalKTable left and inner joins, since they were missing. I also covered this additional scenario where `KeyValueMapper` returns `null` to insure it is handled correctly. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper > returns null > -------------------------------------------------------------------------------------- > > Key: KAFKA-6378 > URL: https://issues.apache.org/jira/browse/KAFKA-6378 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0 > Reporter: Andy Bryant > Priority: Major > Fix For: 1.0.1 > > > On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the > stream fails with a NullPointerException (see stacktrace below). On Kafka > 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with > the table value set to null. > The use-case for this is joining a stream to a table containing reference > data where the stream foreign key may be null. There is no straight-forward > workaround in this case with Kafka 1.0.0 without having to resort to either > generating a key that will never match or branching the stream for records > that don't have the foreign key. > Exception in thread "workshop-simple-example-client-StreamThread-1" > java.lang.NullPointerException > at java.base/java.util.Objects.requireNonNull(Objects.java:221) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116) > at > org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) -- This message was sent by Atlassian JIRA (v7.6.3#76005)