I don't know your overall application setup. However, a KStream
semantically models immutable facts and there is not update semantic.
Thus, it seems semantically questionable, to allow changing the
semantics from facts to updates (the other way is easier IMHO, and thus
supported via KTable#toStream()).

Does this make sense?

Having said this: you _can_ write a KStream into a topic an read it back
as KTable. But it's semantically questionable to do so, IMHO. Maybe it
makes sense for your specific application, but in general I don't think
it does make sense.


-Matthias

On 10/26/18 9:30 AM, John Roesler wrote:
> Hi Patrik,
> 
> Just to drop one observation in... Streaming to a topic and then consuming
> it as a table does create overhead, but so does reducing a stream to a
> table, and I think it's actually the same in either case.
> 
> They both require a store to collect the table state, and in both cases,
> the stores need to have a changelog topic. For the "reduce" version, it's
> an internal changelog topic, and for the "topic-to-table" version, the
> store can use the intermediate topic as its changelog.
> 
> This doesn't address your ergonomic concern, but it seemed worth pointing
> out that (as far as I can tell), there doesn't seem to be a difference in
> overhead.
> 
> Hope this helps!
> -John
> 
> On Fri, Oct 26, 2018 at 3:27 AM Patrik Kleindl <pklei...@gmail.com> wrote:
> 
>> Hello Matthias,
>> thank you for the explanation.
>> Streaming back to a topic and consuming this as a KTable does respect the
>> null values as deletes, correct? But at the price of some overhead.
>> Is there any (historical, technical or emotional;-)) reason that no simple
>> one-step stream-to-table operation exists?
>> Best regards
>> Patrik
>>
>>> Am 26.10.2018 um 00:07 schrieb Matthias J. Sax <matth...@confluent.io>:
>>>
>>> Patrik,
>>>
>>> `null` values in a KStream don't have delete semantics (it's not a
>>> changelog stream). That's why we drop them in the KStream#reduce
>>> implemenation.
>>>
>>> If you want to explicitly remove results for a key from the result
>>> KTable, your `Reducer#apply()` implementation must return `null` -- the
>>> result of #apply() has changelog/KTable semantics and `null` is
>>> interpreted as delete for this case.
>>>
>>> If you want to use `null` from your KStream to trigger reduce() to
>>> delete, you will need to use a surrogate value for this, ie, do a
>>> mapValues() before the groupByKey() call, an replace `null` values with
>>> the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
>>> to return `null` for this case.
>>>
>>> Hope this helps.
>>>
>>> -Matthias
>>>
>>>> On 10/25/18 10:36 AM, Patrik Kleindl wrote:
>>>> Hello
>>>>
>>>> Recently we noticed a lot of warning messages in the logs which pointed
>> to
>>>> this method (we are running 2.0):
>>>>
>>>> KStreamReduce
>>>> public void process(final K key, final V value) {
>>>>            // If the key or value is null we don't need to proceed
>>>>            if (key == null || value == null) {
>>>>                LOG.warn(
>>>>                    "Skipping record due to null key or value. key=[{}]
>>>> value=[{}] topic=[{}] partition=[{}] offset=[{}]",
>>>>                    key, value, context().topic(), context().partition(),
>>>> context().offset()
>>>>                );
>>>>                metrics.skippedRecordsSensor().record();
>>>>                return;
>>>>            }
>>>>
>>>> This was triggered for every record from a stream with an existing key
>> but
>>>> a null value which we put through groupBy/reduce to get a KTable.
>>>> My assumption was that this was the correct way inside a streams
>>>> application to get a KTable but this prevents deletion of records from
>>>> working.
>>>>
>>>> Our alternativ is to send the stream back to a named topic and build a
>> new
>>>> table from it, but this is rather cumbersome and requires a separate
>> topic
>>>> which also can't be cleaned up by the streams reset tool.
>>>>
>>>> Did I miss anything relevant here?
>>>> Would it be possible to create a separate method for KStream to achieve
>>>> this directly?
>>>>
>>>> best regards
>>>>
>>>> Patrik
>>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to