[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16303368#comment-16303368
 ] 

Matthias J. Sax commented on KAFKA-6378:
----------------------------------------

I understand that there is not {{null}} key in the stream or table -- but we 
are reusing the same code as for stream-table join here... But as said already, 
I agree that we could change the behavior for stream-globalTable joins. On the 
other hand, you can also just return any sentinel value instead of {{null}} for 
which you know that it won't join as a potential workaround. Your use case 
seems to be a little special, as usually one cannot tell from the stream record 
itself it is might join or not, and always return the value you want to join 
on. For your case, you try to piggy back a filter into the {{KeyValueMapper}} 
and this is out of scope of the design.

With regard to Java8: as long as Kafka supports Java7 we need to build a 
solution that works there -- moving to Java8 is planned and we can improve the 
code than.

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6378
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6378
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Andy Bryant
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>       at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>       at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>       at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>       at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>       at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>       at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to