[ 
https://issues.apache.org/jira/browse/KAFKA-10277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17166211#comment-17166211
 ] 

Joel Wee commented on KAFKA-10277:
----------------------------------

[~mjsax] Could I pick this up?

At the moment, it also looks like there's no check to enforce that the 
{{keySelector}} returns a non-null join key. So I think it makes sense to relax 
the requirement on the key and restrict it on the keySelector. 

>From the code, it looks like all of this is done in the {{process}} method in 
>{{KStreamKTableJoinProcessor.java}}, so the change should be as simple as 
>changing the check for {{key == null}} in the code there to 
>{{keyMapper.apply(key, value) == null}} (the default keyMapper just returns 
>the key) and then to add some tests. Does that sound right? Method copied 
>below for reference
{code:java}
    public void process(final K1 key, final V1 value) {
        if (key == null || value == null) { // Change key == null to 
keyMapper.apply(key, value) == null
            LOG.warn(
                "Skipping record due to null key or value. key=[{}] value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
                key, value, context().topic(), context().partition(), 
context().offset()
            );
            droppedRecordsSensor.record();
        } else {
            final K2 mappedKey = keyMapper.apply(key, value);
            final V2 value2 = mappedKey == null ? null : 
getValueOrNull(valueGetter.get(mappedKey)); // Can remove this null check
            if (leftJoin || value2 != null) {
                context().forward(key, joiner.apply(value, value2));
            }
        }
    }
{code}
 

 

> Relax non-null key requirement for KStream-GlobalKTable joins
> -------------------------------------------------------------
>
>                 Key: KAFKA-10277
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10277
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Major
>              Labels: beginner, newbie
>
> In general, Kafka Streams requires that key and value are non-null for 
> aggregations and join input records. While this requirement is reasonable in 
> general, for KStream-GlobalKTable joins it's questionable for the stream 
> input record key. The join is based on a `keySelector` and it seems to be 
> sufficient to require that the `keySelector` returns a not-null join-key for 
> the stream record.
> We should consider to relax the non-null key requirement for stream-side 
> input records.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to