Hello
Recently we noticed a lot of warning messages in the logs which pointed to
this method (we are running 2.0):
KStreamReduce
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
if (key == null || value == null) {
LOG.warn(
"Skipping record due to null key or value. key=[{}]
value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(), context().partition(),
context().offset()
);
metrics.skippedRecordsSensor().record();
return;
}
This was triggered for every record from a stream with an existing key but
a null value which we put through groupBy/reduce to get a KTable.
My assumption was that this was the correct way inside a streams
application to get a KTable but this prevents deletion of records from
working.
Our alternativ is to send the stream back to a named topic and build a new
table from it, but this is rather cumbersome and requires a separate topic
which also can't be cleaned up by the streams reset tool.
Did I miss anything relevant here?
Would it be possible to create a separate method for KStream to achieve
this directly?
best regards
Patrik