Hi Patrik, Just to drop one observation in... Streaming to a topic and then consuming it as a table does create overhead, but so does reducing a stream to a table, and I think it's actually the same in either case.
They both require a store to collect the table state, and in both cases, the stores need to have a changelog topic. For the "reduce" version, it's an internal changelog topic, and for the "topic-to-table" version, the store can use the intermediate topic as its changelog. This doesn't address your ergonomic concern, but it seemed worth pointing out that (as far as I can tell), there doesn't seem to be a difference in overhead. Hope this helps! -John On Fri, Oct 26, 2018 at 3:27 AM Patrik Kleindl <pklei...@gmail.com> wrote: > Hello Matthias, > thank you for the explanation. > Streaming back to a topic and consuming this as a KTable does respect the > null values as deletes, correct? But at the price of some overhead. > Is there any (historical, technical or emotional;-)) reason that no simple > one-step stream-to-table operation exists? > Best regards > Patrik > > > Am 26.10.2018 um 00:07 schrieb Matthias J. Sax <matth...@confluent.io>: > > > > Patrik, > > > > `null` values in a KStream don't have delete semantics (it's not a > > changelog stream). That's why we drop them in the KStream#reduce > > implemenation. > > > > If you want to explicitly remove results for a key from the result > > KTable, your `Reducer#apply()` implementation must return `null` -- the > > result of #apply() has changelog/KTable semantics and `null` is > > interpreted as delete for this case. > > > > If you want to use `null` from your KStream to trigger reduce() to > > delete, you will need to use a surrogate value for this, ie, do a > > mapValues() before the groupByKey() call, an replace `null` values with > > the surrogate-delete-marker that you can evaluate in `Reducer#apply()` > > to return `null` for this case. > > > > Hope this helps. > > > > -Matthias > > > >> On 10/25/18 10:36 AM, Patrik Kleindl wrote: > >> Hello > >> > >> Recently we noticed a lot of warning messages in the logs which pointed > to > >> this method (we are running 2.0): > >> > >> KStreamReduce > >> public void process(final K key, final V value) { > >> // If the key or value is null we don't need to proceed > >> if (key == null || value == null) { > >> LOG.warn( > >> "Skipping record due to null key or value. key=[{}] > >> value=[{}] topic=[{}] partition=[{}] offset=[{}]", > >> key, value, context().topic(), context().partition(), > >> context().offset() > >> ); > >> metrics.skippedRecordsSensor().record(); > >> return; > >> } > >> > >> This was triggered for every record from a stream with an existing key > but > >> a null value which we put through groupBy/reduce to get a KTable. > >> My assumption was that this was the correct way inside a streams > >> application to get a KTable but this prevents deletion of records from > >> working. > >> > >> Our alternativ is to send the stream back to a named topic and build a > new > >> table from it, but this is rather cumbersome and requires a separate > topic > >> which also can't be cleaned up by the streams reset tool. > >> > >> Did I miss anything relevant here? > >> Would it be possible to create a separate method for KStream to achieve > >> this directly? > >> > >> best regards > >> > >> Patrik > >> > > >