Hi Patrik,

Thanks for explaining your use case to us. While we can still discuss how
KStream should interpret null-values in aggregations, one workaround atm:
if you deduplication logic can be written as a transformValues operation,
you can do the following:


builder.table("source-topic").transformValues(...
Materialized.as("store-name"))

Note that in a recent PR that we are merging, the source KTable from
builder.table() would not be materialized if users do not specify a
materialized store name, only the value-transformed KTable will be
materialized:

https://github.com/apache/kafka/pull/5779


Would that work for you?

Guozhang


On Mon, Oct 29, 2018 at 2:08 AM Patrik Kleindl <pklei...@gmail.com> wrote:

> Hi John and Matthias
> thanks for the questions, maybe explaining our use case helps a bit:
> We are receiving CDC records (row-level insert/update/delete) in one topic
> per table. The key is derived from the DB records, the value is null in
> case of deletes. Those would be the immutable facts I guess.
> These topics are first streamed through a deduplication Transformer to drop
> changes on irrelevant fields.
> The results are translated to KTables and joined to each other to represent
> the same result as the SQLs on the database, but faster. At this stage the
> delete/null records matter because if a record gets deleted then we want it
> to drop out of the join too. -> Our reduce-approach produced unexpected
> results here.
> We took the deduplication step separately because in some cases we only
> need the the KStream for processing.
> If you see a simpler/cleaner approach here I'm open to suggestions, of
> course.
>
> Regarding the overhead:
> 1) Named topics create management/maintenance overhead because they have to
> be created/treated separately (auto-create is not an option) and be
> considered in future changes, topology changes/resets and so on. The
> internal topic removes most of those issues.
> 2) One of our developers came up with the question if the traffic to/from
> the broker was actually the same in both scenarios, we expect that the same
> is written to the broker for the named topic as well as the reduce-case,
> but if the KTable is maintained inside a streams topology, does it have to
> read back everything it sends to the broker or can it keep the table
> internally? I hope it is understandable what I mean, otherwise I can try
> the explain it more clearly.
>
> best regards
>
> Patrik
>
>
> On Sat, 27 Oct 2018 at 23:50, John Roesler <j...@confluent.io> wrote:
>
> > Hi again Patrik,
> >
> > Actually, this is a good question... Can you share some context about why
> > you need to convert a stream to a table (including nulls as retractions)?
> >
> > Thanks,
> > -John
> >
> > On Fri, Oct 26, 2018 at 5:36 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > I don't know your overall application setup. However, a KStream
> > > semantically models immutable facts and there is not update semantic.
> > > Thus, it seems semantically questionable, to allow changing the
> > > semantics from facts to updates (the other way is easier IMHO, and thus
> > > supported via KTable#toStream()).
> > >
> > > Does this make sense?
> > >
> > > Having said this: you _can_ write a KStream into a topic an read it
> back
> > > as KTable. But it's semantically questionable to do so, IMHO. Maybe it
> > > makes sense for your specific application, but in general I don't think
> > > it does make sense.
> > >
> > >
> > > -Matthias
> > >
> > > On 10/26/18 9:30 AM, John Roesler wrote:
> > > > Hi Patrik,
> > > >
> > > > Just to drop one observation in... Streaming to a topic and then
> > > consuming
> > > > it as a table does create overhead, but so does reducing a stream to
> a
> > > > table, and I think it's actually the same in either case.
> > > >
> > > > They both require a store to collect the table state, and in both
> > cases,
> > > > the stores need to have a changelog topic. For the "reduce" version,
> > it's
> > > > an internal changelog topic, and for the "topic-to-table" version,
> the
> > > > store can use the intermediate topic as its changelog.
> > > >
> > > > This doesn't address your ergonomic concern, but it seemed worth
> > pointing
> > > > out that (as far as I can tell), there doesn't seem to be a
> difference
> > in
> > > > overhead.
> > > >
> > > > Hope this helps!
> > > > -John
> > > >
> > > > On Fri, Oct 26, 2018 at 3:27 AM Patrik Kleindl <pklei...@gmail.com>
> > > wrote:
> > > >
> > > >> Hello Matthias,
> > > >> thank you for the explanation.
> > > >> Streaming back to a topic and consuming this as a KTable does
> respect
> > > the
> > > >> null values as deletes, correct? But at the price of some overhead.
> > > >> Is there any (historical, technical or emotional;-)) reason that no
> > > simple
> > > >> one-step stream-to-table operation exists?
> > > >> Best regards
> > > >> Patrik
> > > >>
> > > >>> Am 26.10.2018 um 00:07 schrieb Matthias J. Sax <
> > matth...@confluent.io
> > > >:
> > > >>>
> > > >>> Patrik,
> > > >>>
> > > >>> `null` values in a KStream don't have delete semantics (it's not a
> > > >>> changelog stream). That's why we drop them in the KStream#reduce
> > > >>> implemenation.
> > > >>>
> > > >>> If you want to explicitly remove results for a key from the result
> > > >>> KTable, your `Reducer#apply()` implementation must return `null` --
> > the
> > > >>> result of #apply() has changelog/KTable semantics and `null` is
> > > >>> interpreted as delete for this case.
> > > >>>
> > > >>> If you want to use `null` from your KStream to trigger reduce() to
> > > >>> delete, you will need to use a surrogate value for this, ie, do a
> > > >>> mapValues() before the groupByKey() call, an replace `null` values
> > with
> > > >>> the surrogate-delete-marker that you can evaluate in
> > `Reducer#apply()`
> > > >>> to return `null` for this case.
> > > >>>
> > > >>> Hope this helps.
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>>> On 10/25/18 10:36 AM, Patrik Kleindl wrote:
> > > >>>> Hello
> > > >>>>
> > > >>>> Recently we noticed a lot of warning messages in the logs which
> > > pointed
> > > >> to
> > > >>>> this method (we are running 2.0):
> > > >>>>
> > > >>>> KStreamReduce
> > > >>>> public void process(final K key, final V value) {
> > > >>>>            // If the key or value is null we don't need to proceed
> > > >>>>            if (key == null || value == null) {
> > > >>>>                LOG.warn(
> > > >>>>                    "Skipping record due to null key or value.
> > key=[{}]
> > > >>>> value=[{}] topic=[{}] partition=[{}] offset=[{}]",
> > > >>>>                    key, value, context().topic(),
> > > context().partition(),
> > > >>>> context().offset()
> > > >>>>                );
> > > >>>>                metrics.skippedRecordsSensor().record();
> > > >>>>                return;
> > > >>>>            }
> > > >>>>
> > > >>>> This was triggered for every record from a stream with an existing
> > key
> > > >> but
> > > >>>> a null value which we put through groupBy/reduce to get a KTable.
> > > >>>> My assumption was that this was the correct way inside a streams
> > > >>>> application to get a KTable but this prevents deletion of records
> > from
> > > >>>> working.
> > > >>>>
> > > >>>> Our alternativ is to send the stream back to a named topic and
> > build a
> > > >> new
> > > >>>> table from it, but this is rather cumbersome and requires a
> separate
> > > >> topic
> > > >>>> which also can't be cleaned up by the streams reset tool.
> > > >>>>
> > > >>>> Did I miss anything relevant here?
> > > >>>> Would it be possible to create a separate method for KStream to
> > > achieve
> > > >>>> this directly?
> > > >>>>
> > > >>>> best regards
> > > >>>>
> > > >>>> Patrik
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
>


-- 
-- Guozhang

Reply via email to