Hi Matthias,

Here are my updates on your points.

101.
> You propose to add static methods `keySerde()` and `valueSerde()` --
> in other config classes, we use only `with(keySerde, valueSerde)` as we
try
> to use the "builder" pattern, and avoid too many overloads. I would
> prefer to omit both methods you suggest and just use a single `with` for
> both serdes.

I was actually inspired by the other config classes, for example `Joined`
and
`Grouped` both have the static methods `keySerde()` and `valueSerde()`.

> I think we don't want to add `with(...)` which takes all
> parameters at once

Done.


102.
Thanks, your suggestion sounds good to me. The trade-off of having an index
that allows to efficiently purge expired records besides the keyValue store
makes sense. I've been looking into the code, and I think a similar idea
was implemented for other processors (for example with
DualSchemaRocksDBSegmentedBytesStore).
As you said, I think we would benefit from some existing code here.
KIP updated !


104.
Updated the KIP to consider records' offsets.


105
> picking the first offset with smallest ts sounds good to me. The KIP
> should be explicit about it

Done.

> But as discussed above, it might be
> simplest to not really have a window lookup, but just a plain key-lookup
> and drop if the key exists in the store?

KIP updated, we will be `.get()`ing from a keyValueStore instead of
`.fetch()`ing
from a WindowStore.

> Another line of thinking, that did serve us well in the past: in doubt
> keep a record -- users can add operators to drop record (in case they
> don't want to keep it), but if we drop a record, users have no way to
> resurrect it (thus, building a workaround to change semantica is
> possible for users if we default to keep records, but not the other way
> around).

Makes total sense ! I updated the KIP to forward late records instead of
dropping them.


106.
For the moment, I highlighted in Javadocs that we are deduplicating by
partition. If there is a better name to have this information in the name
of the api itself it would be good.


Best,
Ayoub


Le jeu. 13 juin 2024 à 09:03, Sebastien Viale <sebastien.vi...@michelin.com>
a écrit :

>
> Hi,
>
> 106 :
>
> Thanks for the clarification. Actually, this is not what I expected, but I
> better understand the performance issues regarding the state store
> iteration.
> If this is how it should be designed, it is fine for me as long as it is
> clear that the repartition must be done before the deduplication.
> Sébastien
>
> ________________________________
> De : Matthias J. Sax <mj...@apache.org>
> Envoyé : jeudi 13 juin 2024 02:51
> À : dev@kafka.apache.org <dev@kafka.apache.org>
> Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication processor in
> kafka-streams
>
> Warning External sender Do not click on any links or open any attachments
> unless you trust the sender and know the content is safe.
>
> 106:
>
> > For the use-case of deduplicating a "at least once written" stream,
> > we are sure that the duplicate record has the same key as the
> > original one, and will land on the same task. Here, a user would
> > want to specify a deduplication key different from the topic's key
> > in case the topic's key is not a unique identifier
> >
> > For example, we have a topic with keyValue (`userId`, `transaction`)
> > and deduplication is done on `transaction`.`id` . Here, the application
> > wants to deduplicate transactions. It knows that a transaction id
> > maps to a single userId. Any duplicate of that record would be received
> > by the task which processes this userId.
>
> This is an interesting point.
>
> My concern is to some extend, that it seems (on the surface) to not
> follow the established pattern of auto-repartitioning in the DSL. Of
> course, given that the current proposal says we use an "id extractor"
> and not a "key extractor" it might be ok (but it might be somewhat
> subtle). Of course, JavaDocs always help to explain in detail. Would
> this be enough?
>
> Would be good to hear from others about this point. I am personally not
> sure which approach I would prefer personally at this point.
>
> The problem reminds me on
> https://issues.apache.org/jira/browse/KAFKA-10844<
> https://issues.apache.org/jira/browse/KAFKA-10844> which is not resolve
> directly either. We do have KIP-759
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: Unneeded
> repartition canceling<
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: Unneeded
> repartition canceling>)
> which is WIP and helps with KAFKA-10844, but not sure if it would be a
> viable solution for the de-duplication case?
>
>
>
> -Matthias
>
> This email was screened for spam and malicious content but exercise
> caution anyway.
>
>
>
>
> On 6/11/24 2:31 PM, Ayoub Omari wrote:
> > Hi Sebastien & Matthias,
> >
> > For 106.
> > My idea was to deduplicate on a per-task basis. If the user wants
> > to do a global deduplication over all partitions, I think it's better to
> > have him explicitly repartition and then call the deduplication
> processor.
> >
> > For the use-case of deduplicating a "at least once written" stream,
> > we are sure that the duplicate record has the same key as the
> > original one, and will land on the same task. Here, a user would
> > want to specify a deduplication key different from the topic's key
> > in case the topic's key is not a unique identifier.
> >
> > For example, we have a topic with keyValue (`userId`, `transaction`)
> > and deduplication is done on `transaction`.`id` . Here, the application
> > wants to deduplicate transactions. It knows that a transaction id
> > maps to a single userId. Any duplicate of that record would be received
> > by the task which processes this userId.
> >
> > One other thought I have when writing the KIP about global deduplication,
> > is that it will require to map twice the key of the stream (first map to
> > change the key to deduplication key, and second map to get
> > back the initial key). Second map may imply a second repartitioning.
> >
> > However, if we do a per-task deduplication, the user may adapt to his
> > specific use-case.
> >
> > Let me know what you think
> > Ayoub
> >
> >
> >
> > Le mar. 11 juin 2024 à 20:21, Matthias J. Sax <mj...@apache.org> a
> écrit :
> >
> >> Thanks Sebastien,
> >>
> >> that's a good point. Thanks for raising it. -- I like your proposal.
> >>
> >> An alternative would be to have two overloads of `deduplicate()` one w/
> >> and one w/o the "id extractor" parameter. This would be less explicit
> >> though.
> >>
> >>
> >> -Matthias
> >>
> >> On 6/11/24 2:30 AM, Sebastien Viale wrote:
> >>> Hi,
> >>>
> >>> I am really interested in this KIP.
> >>>
> >>> 106:
> >>> I hope I am not talking nonsense, but if you do not deduplicate based
> on
> >> the key, the input stream has to be repartitioned.
> >>> Otherwise, different stream tasks may handle records that need to be
> >> deduplicated, and thus duplicates will not be detected.
> >>>
> >>> This is why I would have created two different methods, as is done for
> >> GroupBy:
> >>>
> >>> deduplicateByKey(...)
> >>> deduplicate(...)
> >>>
> >>> If deduplicateByKey is used, the input stream does not need to be
> >> repartitioned.
> >>>
> >>> thanks
> >>>
> >>> Sébastien
> >>> ________________________________
> >>> De : Matthias J. Sax <mj...@apache.org>
> >>> Envoyé : mardi 11 juin 2024 01:54
> >>> À : dev@kafka.apache.org <dev@kafka.apache.org>
> >>> Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication processor in
> >> kafka-streams
> >>>
> >>> Warning External sender Do not click on any links or open any
> >> attachments unless you trust the sender and know the content is safe.
> >>>
> >>> Thanks for the update Ayoub.
> >>>
> >>>
> >>> 101: you say:
> >>>
> >>>> But I am not sure if we don't want to have them for this processor ?
> >>>
> >>> What is your reasoning to move off the established pattern? Would be
> >>> good to understand, why `Deduplicated` class needs a different
> >>> "structure" compared to existing classes.
> >>>
> >>>
> >>>
> >>> 102: Creating iterators is very expensive. For other work, we actually
> >>> hit 100x (?) throughput degradation by creating an (for most cases
> >>> empty) iterator for every input record, and needed to find other ways
> to
> >>> avoid creating an iterator per record. It really kills performance.
> >>>
> >>> I see the point about data expiration. We could experiment with
> >>> punctuation to expire old data, or add a second "time-ordered store"
> >>> (which we already have at hand) which acts as an index into the main
> >>> store. -- Another possibility would be to add a new version of
> segmented
> >>> store with a different key-layout (ie, just store the plain key). I
> >>> think with some refactoring, we might be able to re-use a lot of
> >>> existing code.
> >>>
> >>>
> >>>
> >>> 104:
> >>>
> >>>> This gets me wondering if this is a limitation of stateful processors
> >>>> in ALOS. For example a windowed aggregation with `on_window_close`
> >>>> emit strategy may have the same limitation today (we receive a record,
> >>>> we update its aggregation result in the store, then crash before
> >> committing,
> >>>> then the record will be again reconsidered for aggregation). Is this
> >>>> correct ?
> >>>
> >>> Yes, this is correct, but it does not violate ALOS, because we did not
> >>> lose the input record -- of course, the aggregation would contain the
> >>> input record twice (eg, over count), but this is ok under ALOS.
> >>> Unfortunately, for de-duplication this pattern breaks, because
> >>> de-duplication operator does a different "aggregation logic" depending
> >>> on its state (emit if no key found, but not emit if key found). For
> >>> counting as an example, we increment the count and emit unconditionally
> >>> though.
> >>>
> >>>
> >>>> As a workaround, I think storing the record's offset inside the
> >>>> store's value can tell us whether the record has been already seen or
> >> not.
> >>>> If we receive a record whose deduplication id exists in the store
> >>>> and the entry in the store has the same offset, it means the record
> >>>> is processed twice and we can go ahead and forward it. If the offset
> >>>> is different, it means it's a duplicate record, so we ignore it.
> >>>
> >>> Great idea. This might work... If we store the input record offset, we
> >>> can actually avoid that the "aggregation logic" changes for the same
> >>> input record. -- And yes, with ALOS potentially emitting a duplicate is
> >>> the-name-of-the-game, so no concerns on this part from my side.
> >>>
> >>>
> >>>
> >>> 105: picking the first offset with smallest ts sound good to me. The
> KIP
> >>> should be explicit about it. But as discussed above, it might be
> >>> simplest to not really have a window lookup, but just a plain
> key-lookup
> >>> and drop if the key exists in the store? -- For late records, it might
> >>> imply that they are not de-duplicated, but this is also the case for
> >>> in-order records if they are further apart than the de-duplication
> >>> window size, right? Thus I would believe this is "more natural"
> compared
> >>> to discarding late records pro-actively, which would lead to missing
> >>> result records?
> >>>
> >>> We could also make this configurable if we are not sure what users
> >>> really need -- or add such a configuration later in case the semantics
> >>> we pick don't work for some users.
> >>>
> >>> Another line of thinking, that did serve us well in the past: in doubt
> >>> keep a record -- users can add operators to drop record (in case they
> >>> don't want to keep it), but if we drop a record, users have no way to
> >>> resurrect it (thus, building a workaround to change semantica is
> >>> possible for users if we default to keep records, but not the other way
> >>> around).
> >>>
> >>> Would be good to get input from the broader community on this question
> >>> thought. In the end, it must be a use-case driven decision?
> >>>
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> This email was screened for spam and malicious content but exercise
> >> caution anyway.
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On 6/6/24 5:02 AM, Ayoub Omari wrote:
> >>>> Hi Matthias,
> >>>>
> >>>> Thank you for your review !
> >>>>
> >>>> 100.
> >>>> I agree. I changed the name of the parameter to "idSelector".
> >>>> Because this id may be computed, It is better to call it "id" rather
> >> than
> >>>> field or attribute.
> >>>>
> >>>> 101.
> >>>> The reason I added the methods `keySerde()` and `valueSerde()` was to
> >>>> have the same capabilities as other serde classes (such as Grouped
> >>>> or Joined). As a Kafka-streams user, I usually use `with(keySerde,
> >>>> valueSerde)`
> >>>> as you suggested. But I am not sure if we don't want to have them for
> >> this
> >>>> processor ?
> >>>>
> >>>> 102.
> >>>> That's a good point ! Because we know that the window store will
> contain
> >>>> at most one instance of a given key, I am not sure how the range fetch
> >>>> on WindowStore compares to a KeyValueStore `get()` in this case.
> >>>> Wondering if the fact that the record's key is the prefix of the
> >> underlying
> >>>> keyValueStore's key ("<dataKey,ts>") may provide comparable
> performance
> >>>> to the random access of KeyValueStore ? Of course, the WindowStore
> >> fetch()
> >>>> would be less efficient because it may fetch from more than 1 segment,
> >> and
> >>>> because of some iterator overhead.
> >>>>
> >>>> The purpose of using a WindowStore is to automatically purge old data.
> >>>> For example, deduplicating a topic written at least once wouldn't
> >> require
> >>>> keeping a large history. This is not the case of using a KeyValueStore
> >>>> which would require scanning regularly to remove expired records.
> >>>> That might cause a sudden increase of latency whenever the cleanup
> >>>> is triggered.
> >>>>
> >>>> It would be good to hear from anyone who has done some analysis
> >>>> on RocksDB's range fetch.
> >>>>
> >>>> 103.
> >>>> Sure, I can update it once we agree on underlying semantics.
> >>>>
> >>>> 104.
> >>>> Another good point !
> >>>>
> >>>>> In the end, de-duplication does only make sense when EOS is used
> >>>>
> >>>> I agree with that. And for me, the use case of deduplicating a topic
> >>>> written ALOS inside an EOS application might be the top 1 use case
> >>>> of deduplication.
> >>>>
> >>>>> all downstream processing happens, `context.forward()` returns
> >>>>> and we update the state store, we could now crash w/o committing
> >> offsets
> >>>>
> >>>> This gets me wondering if this is a limitation of stateful processors
> >>>> in ALOS. For example a windowed aggregation with `on_window_close`
> >>>> emit strategy may have the same limitation today (we receive a record,
> >>>> we update its aggregation result in the store, then crash before
> >> committing,
> >>>> then the record will be again reconsidered for aggregation). Is this
> >>>> correct ?
> >>>>
> >>>> As a workaround, I think storing the record's offset inside the
> >>>> store's value can tell us whether the record has been already seen or
> >> not.
> >>>> If we receive a record whose deduplication id exists in the store
> >>>> and the entry in the store has the same offset, it means the record
> >>>> is processed twice and we can go ahead and forward it. If the offset
> >>>> is different, it means it's a duplicate record, so we ignore it.
> >>>>
> >>>> As you said, we don't have any guarantee whether the initial record
> was
> >>>> forwarded or not in case of a crash before commit. In this solution
> >>>> we would forward the record twice, which is against deduplication.
> >>>> But, this is still an ALOS application, so it has the same semantics
> >>>> as any other such application. With this, I am not sure we can
> >>>> have "strict" deduplication for ALOS applications.
> >>>>
> >>>> 105.
> >>>> For me, if there are two duplicate records, it means they are
> >>>> the same in the application's point of view, so it can choose
> >>>> either one. Thus, I would go with forwarding the record with
> >>>> the least offset.
> >>>>
> >>>>> Would it not be desired to drop all duplicates independent
> >>>>> of their ts, as long as we find a record in the store?
> >>>>
> >>>> This is actually related to the (suggested) windowed nature
> >>>> of deduplication. As in 102. we don't want to do a "forever"
> >>>> deduplication, which may be impossible for huge workloads
> >>>> where all records should be kept in the store. Hence, the fetch
> >>>> of timestamp between [ts-deduplicationInterval,
> >> ts+deduplicationInterval]
> >>>>
> >>>> About late records, this is again due to the windowed nature.
> >>>> Because the store won't save those late (i.e. expired) records,
> >>>> we have two options. Either, we do not apply deduplication
> >>>> on them, thus the deduplication doesn't work on late records,
> >>>> or we discard them (which is the option I suggest).
> >>>> In the second case, It would be up to the user to choose
> >>>> any deduplicationInterval that may sufficiently cover all his late
> data.
> >>>> What do you think ?
> >>>>
> >>>> Thanks,
> >>>> Ayoub
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> Le mar. 4 juin 2024 à 23:58, Matthias J. Sax <mj...@apache.org> a
> >> écrit :
> >>>>
> >>>>> Ayoub,
> >>>>>
> >>>>> thanks for resurrecting this KIP. I think a built-in de-duplication
> >>>>> operator will be very useful.
> >>>>>
> >>>>>
> >>>>> Couple of questions:
> >>>>>
> >>>>>
> >>>>>
> >>>>> 100: `deduplicationKeySelector`
> >>>>>
> >>>>> Is this the best name? It might indicate that we select a "key" what
> is
> >>>>> an overloaded term... Maybe we could use `Field` or `Id` or
> `Attribute`
> >>>>> instead of `Key` in the name? Just brainstorming. If we think `Key`
> is
> >>>>> the best word, I am also ok with it.
> >>>>>
> >>>>>
> >>>>>
> >>>>> 101: `Deduplicated` class
> >>>>>
> >>>>> You propose to add static methods `keySerde()` and `valueSerde()` --
> in
> >>>>> other config classes, we use only `with(keySerde, valueSerde)` as we
> >> try
> >>>>> to use the "builder" pattern, and avoid too many overloads. I would
> >>>>> prefer to omit both methods you suggest and just use a single `with`
> >> for
> >>>>> both serdes.
> >>>>>
> >>>>> Similarly, I thing we don't want to add `with(...)` which takes all
> >>>>> parameters at once (which should only be 3 parameters, not 4 as it's
> >>>>> currently in the KIP)?
> >>>>>
> >>>>>
> >>>>>
> >>>>> 102: Usage of `WindowedStore`:
> >>>>>
> >>>>> Would this be efficient? The physical byte layout it "<dataKey,ts>"
> for
> >>>>> the store key, so it would be difficult to do an efficient lookup
> for a
> >>>>> given "de-duplication key" to discard duplicates, as we don't know
> the
> >>>>> timestamp of the first record with the same "de-duplication key".
> >>>>>
> >>>>> This boils down to the actual de-duplication logic (some more
> comments
> >>>>> below), but what you propose seems to require expensive range-scans
> >> what
> >>>>> could be cost prohibitive in practice. I think we need to find a way
> to
> >>>>> use efficient key-point-lookups to make this work.
> >>>>>
> >>>>>
> >>>>>
> >>>>> 103: "Processing logic":
> >>>>>
> >>>>> Might need some updates (Cf 102 comment). I am not sure if I fully
> >>>>> understand the logic: cf 105 below.
> >>>>>
> >>>>>
> >>>>>
> >>>>> 104:
> >>>>>
> >>>>>> If no entries found → forward the record + save the record in the
> >> store
> >>>>>
> >>>>> This part is critical, and we should discuss in detail. In the end,
> >>>>> de-duplication does only make sense when EOS is used, and we might
> want
> >>>>> to call this out (eg, on the JavaDocs)? But if used with ALOS, it's
> >> very
> >>>>> difficult to ensure that we never lose data... Your proposal to
> >>>>> first-forward goes into the right direction, but does not really
> solve
> >>>>> the problem entirely:
> >>>>>
> >>>>> Even if we forward the message first, all downstream processing
> >> happens,
> >>>>> `context.forward()` returns and we update the state store, we could
> now
> >>>>> crash w/o committing offsets. For this case, we have no guarantee
> that
> >>>>> the result records where published (as we did not flush the producer
> >>>>> yet), but when re-reading from the input topic, we would find the
> >> record
> >>>>> in the store and incorrectly drop as duplicate...
> >>>>>
> >>>>> I think the only solution to make this work would be to use TX-state
> >>>>> stores in combination with ALOS as proposed via KIP-892?
> >>>>>
> >>>>> Using an in-memory store won't help much either? The producer could
> >> have
> >>>>> send the write into the changelog topic, but not into the result
> topic,
> >>>>> and thus we could still not guarantee ALOS...?
> >>>>>
> >>>>> How do we want to go about this? We could also say, this new operator
> >>>>> only works with EOS. Would this be too restrictive? -- At lest for
> >> know,
> >>>>> until KIP-892 lands, and we could relax it?
> >>>>>
> >>>>>
> >>>>>
> >>>>> 105: "How to detect late records"
> >>>>>
> >>>>> In the end, it seems to boil down to determine which of the records
> to
> >>>>> forward and which record to drop, for (1) the regular case and (2)
> the
> >>>>> out-of-order data case.
> >>>>>
> >>>>> Regular case (no out-of-order data): For this case, offset and ts
> order
> >>>>> is the same, and we can forward the first record we get. All later
> >>>>> record within "de-duplication period" with the same "de-duplication
> >> key"
> >>>>> would be dropped. If a record with the same "de-duplication key"
> >> arrives
> >>>>> after "de-duplication period" passed, we cannot drop it any longer,
> but
> >>>>> would still forward it, as by the contract of the operator /
> >>>>> de-duplication period.
> >>>>>
> >>>>> For the out-of-order case: The first question we need to answer is,
> do
> >>>>> we want to forward the record with the smallest offset or the record
> >>>>> with the smallest ts? Logically, forwarding with the smallest ts
> might
> >>>>> be "more correct", however, it implies we could only forward it after
> >>>>> "de-duplication period" passed, what might be undesired latency?
> Would
> >>>>> this be desired/acceptable?
> >>>>>
> >>>>> In contrast, if we forward record with the smallest offset (this is
> >> what
> >>>>> you seem to propose) we don't have a latency issue, but of course the
> >>>>> question what records to drop is more tricky to answer: it seems you
> >>>>> propose to compare the time difference of the stored record to the
> >>>>> current record, but I am wondering why? Would it not be desired to
> drop
> >>>>> all duplicates independent of their ts, as long as we find a record
> in
> >>>>> the store? Would be good to get some more motivation and tradeoffs
> >>>>> discussed about the different strategies we could use.
> >>>>>
> >>>>> You also propose to drop _any_ late record: I am also not sure if
> >> that's
> >>>>> desired? Could this not lead to data loss? Assume we get a late
> record,
> >>>>> but in fact there was never a duplicate? Why would we want to drop
> it?
> >>>>> If there is a late record which is indeed a duplicate, but we purged
> >> the
> >>>>> original record from the store already, it seems to be the same case
> as
> >>>>> for the "no out-of-order case": after we purged we cannot
> de-duplicate
> >>>>> and thus it's a regular case we can just accept?
> >>>>>
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 5/29/24 4:58 AM, Ayoub Omari wrote:
> >>>>>> Hi everyone,
> >>>>>>
> >>>>>> I've just made a (small) change to this KIP about an implementation
> >>>>> detail.
> >>>>>> Please let me know your thoughts.
> >>>>>>
> >>>>>> Thank you,
> >>>>>> Ayoub
> >>>>>>
> >>>>>> Le lun. 20 mai 2024 à 21:13, Ayoub <ayoubomar...@gmail.com> a
> écrit :
> >>>>>>
> >>>>>>> Hello,
> >>>>>>>
> >>>>>>> Following a discussion on community slack channel, I would like to
> >>>>> revive
> >>>>>>> the discussion on the KIP-655, which is about adding a
> deduplication
> >>>>>>> processor in kafka-streams.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> >
> >> <
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> >
> >>>
> >>>>>>>
> >>>>>>> Even though the motivation is not quite the same as the initial
> one,
> >> I
> >>>>>>> updated the KIP rather than creating a new one, as I believe the
> end
> >>>>> goal
> >>>>>>> is the same.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Ayoub
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> >
>

Reply via email to