[
https://issues.apache.org/jira/browse/KAFKA-10277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17166211#comment-17166211
]
Joel Wee commented on KAFKA-10277:
----------------------------------
[~mjsax] Could I pick this up?
At the moment, it also looks like there's no check to enforce that the
{{keySelector}} returns a non-null join key. So I think it makes sense to relax
the requirement on the key and restrict it on the keySelector.
>From the code, it looks like all of this is done in the {{process}} method in
>{{KStreamKTableJoinProcessor.java}}, so the change should be as simple as
>changing the check for {{key == null}} in the code there to
>{{keyMapper.apply(key, value) == null}} (the default keyMapper just returns
>the key) and then to add some tests. Does that sound right? Method copied
>below for reference
{code:java}
public void process(final K1 key, final V1 value) {
if (key == null || value == null) { // Change key == null to
keyMapper.apply(key, value) == null
LOG.warn(
"Skipping record due to null key or value. key=[{}] value=[{}]
topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(), context().partition(),
context().offset()
);
droppedRecordsSensor.record();
} else {
final K2 mappedKey = keyMapper.apply(key, value);
final V2 value2 = mappedKey == null ? null :
getValueOrNull(valueGetter.get(mappedKey)); // Can remove this null check
if (leftJoin || value2 != null) {
context().forward(key, joiner.apply(value, value2));
}
}
}
{code}
> Relax non-null key requirement for KStream-GlobalKTable joins
> -------------------------------------------------------------
>
> Key: KAFKA-10277
> URL: https://issues.apache.org/jira/browse/KAFKA-10277
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Matthias J. Sax
> Priority: Major
> Labels: beginner, newbie
>
> In general, Kafka Streams requires that key and value are non-null for
> aggregations and join input records. While this requirement is reasonable in
> general, for KStream-GlobalKTable joins it's questionable for the stream
> input record key. The join is based on a `keySelector` and it seems to be
> sufficient to require that the `keySelector` returns a not-null join-key for
> the stream record.
> We should consider to relax the non-null key requirement for stream-side
> input records.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)