Patrik,

`null` values in a KStream don't have delete semantics (it's not a
changelog stream). That's why we drop them in the KStream#reduce
implemenation.

If you want to explicitly remove results for a key from the result
KTable, your `Reducer#apply()` implementation must return `null` -- the
result of #apply() has changelog/KTable semantics and `null` is
interpreted as delete for this case.

If you want to use `null` from your KStream to trigger reduce() to
delete, you will need to use a surrogate value for this, ie, do a
mapValues() before the groupByKey() call, an replace `null` values with
the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
to return `null` for this case.

Hope this helps.

-Matthias

On 10/25/18 10:36 AM, Patrik Kleindl wrote:
> Hello
> 
> Recently we noticed a lot of warning messages in the logs which pointed to
> this method (we are running 2.0):
> 
> KStreamReduce
> public void process(final K key, final V value) {
>             // If the key or value is null we don't need to proceed
>             if (key == null || value == null) {
>                 LOG.warn(
>                     "Skipping record due to null key or value. key=[{}]
> value=[{}] topic=[{}] partition=[{}] offset=[{}]",
>                     key, value, context().topic(), context().partition(),
> context().offset()
>                 );
>                 metrics.skippedRecordsSensor().record();
>                 return;
>             }
> 
> This was triggered for every record from a stream with an existing key but
> a null value which we put through groupBy/reduce to get a KTable.
> My assumption was that this was the correct way inside a streams
> application to get a KTable but this prevents deletion of records from
> working.
> 
> Our alternativ is to send the stream back to a named topic and build a new
> table from it, but this is rather cumbersome and requires a separate topic
> which also can't be cleaned up by the streams reset tool.
> 
> Did I miss anything relevant here?
> Would it be possible to create a separate method for KStream to achieve
> this directly?
> 
> best regards
> 
> Patrik
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to