[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16302605#comment-16302605
 ] 

Andy Bryant edited comment on KAFKA-6378 at 12/23/17 10:52 PM:
---------------------------------------------------------------

I'm using {{null}} here to indicate there is no possible match for the stream 
record in the globalKTable. Another option would be to make the more explicit, 
say by returning an {{Optional}} in the {{KeyValueMapper}}. However if {{null}} 
is never a valid value for the key of a table or globalKTable, you could 
explicitly have this as a supported return value that indicates no match and 
not crash the stream.

Mapping the stream records to replace {{null}} references with a dummy 
'sentinel' value smells too, especially when you don't control the values in 
the key and you can't be sure you have picked an 'invalid' key.

Not having the ability to simply indicate there is no match is a real issue for 
us, as this pattern has come up at several clients where they wish to replicate 
SQL view like behaviour in streams joining on a nullable foreign key.


was (Author: kiwiandy):
I'm using {{null}} here to indicate there is no possible match for this record. 
Another option would be to make the more explicit, say by return an 
{{Optional}} in the {{KeyValueMapper}}. However if {{null}} is never a valid 
value for the key of a table or globalKTable, you could explicitly have this as 
a supported return value that indicates no match and not crash the stream.

Not having the ability to simply indicate there is no match is a real issue for 
us, as this pattern has come up at several clients where they wish to replicate 
SQL view like behaviour in streams joining on a nullable foreign key.

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6378
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6378
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Andy Bryant
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>       at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>       at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>       at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>       at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>       at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>       at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to