Hi Matthias, Here are my updates on your points.
101. > You propose to add static methods `keySerde()` and `valueSerde()` -- > in other config classes, we use only `with(keySerde, valueSerde)` as we try > to use the "builder" pattern, and avoid too many overloads. I would > prefer to omit both methods you suggest and just use a single `with` for > both serdes. I was actually inspired by the other config classes, for example `Joined` and `Grouped` both have the static methods `keySerde()` and `valueSerde()`. > I think we don't want to add `with(...)` which takes all > parameters at once Done. 102. Thanks, your suggestion sounds good to me. The trade-off of having an index that allows to efficiently purge expired records besides the keyValue store makes sense. I've been looking into the code, and I think a similar idea was implemented for other processors (for example with DualSchemaRocksDBSegmentedBytesStore). As you said, I think we would benefit from some existing code here. KIP updated ! 104. Updated the KIP to consider records' offsets. 105 > picking the first offset with smallest ts sounds good to me. The KIP > should be explicit about it Done. > But as discussed above, it might be > simplest to not really have a window lookup, but just a plain key-lookup > and drop if the key exists in the store? KIP updated, we will be `.get()`ing from a keyValueStore instead of `.fetch()`ing from a WindowStore. > Another line of thinking, that did serve us well in the past: in doubt > keep a record -- users can add operators to drop record (in case they > don't want to keep it), but if we drop a record, users have no way to > resurrect it (thus, building a workaround to change semantica is > possible for users if we default to keep records, but not the other way > around). Makes total sense ! I updated the KIP to forward late records instead of dropping them. 106. For the moment, I highlighted in Javadocs that we are deduplicating by partition. If there is a better name to have this information in the name of the api itself it would be good. Best, Ayoub Le jeu. 13 juin 2024 à 09:03, Sebastien Viale <sebastien.vi...@michelin.com> a écrit : > > Hi, > > 106 : > > Thanks for the clarification. Actually, this is not what I expected, but I > better understand the performance issues regarding the state store > iteration. > If this is how it should be designed, it is fine for me as long as it is > clear that the repartition must be done before the deduplication. > Sébastien > > ________________________________ > De : Matthias J. Sax <mj...@apache.org> > Envoyé : jeudi 13 juin 2024 02:51 > À : dev@kafka.apache.org <dev@kafka.apache.org> > Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication processor in > kafka-streams > > Warning External sender Do not click on any links or open any attachments > unless you trust the sender and know the content is safe. > > 106: > > > For the use-case of deduplicating a "at least once written" stream, > > we are sure that the duplicate record has the same key as the > > original one, and will land on the same task. Here, a user would > > want to specify a deduplication key different from the topic's key > > in case the topic's key is not a unique identifier > > > > For example, we have a topic with keyValue (`userId`, `transaction`) > > and deduplication is done on `transaction`.`id` . Here, the application > > wants to deduplicate transactions. It knows that a transaction id > > maps to a single userId. Any duplicate of that record would be received > > by the task which processes this userId. > > This is an interesting point. > > My concern is to some extend, that it seems (on the surface) to not > follow the established pattern of auto-repartitioning in the DSL. Of > course, given that the current proposal says we use an "id extractor" > and not a "key extractor" it might be ok (but it might be somewhat > subtle). Of course, JavaDocs always help to explain in detail. Would > this be enough? > > Would be good to hear from others about this point. I am personally not > sure which approach I would prefer personally at this point. > > The problem reminds me on > https://issues.apache.org/jira/browse/KAFKA-10844< > https://issues.apache.org/jira/browse/KAFKA-10844> which is not resolve > directly either. We do have KIP-759 > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: Unneeded > repartition canceling< > https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: Unneeded > repartition canceling>) > which is WIP and helps with KAFKA-10844, but not sure if it would be a > viable solution for the de-duplication case? > > > > -Matthias > > This email was screened for spam and malicious content but exercise > caution anyway. > > > > > On 6/11/24 2:31 PM, Ayoub Omari wrote: > > Hi Sebastien & Matthias, > > > > For 106. > > My idea was to deduplicate on a per-task basis. If the user wants > > to do a global deduplication over all partitions, I think it's better to > > have him explicitly repartition and then call the deduplication > processor. > > > > For the use-case of deduplicating a "at least once written" stream, > > we are sure that the duplicate record has the same key as the > > original one, and will land on the same task. Here, a user would > > want to specify a deduplication key different from the topic's key > > in case the topic's key is not a unique identifier. > > > > For example, we have a topic with keyValue (`userId`, `transaction`) > > and deduplication is done on `transaction`.`id` . Here, the application > > wants to deduplicate transactions. It knows that a transaction id > > maps to a single userId. Any duplicate of that record would be received > > by the task which processes this userId. > > > > One other thought I have when writing the KIP about global deduplication, > > is that it will require to map twice the key of the stream (first map to > > change the key to deduplication key, and second map to get > > back the initial key). Second map may imply a second repartitioning. > > > > However, if we do a per-task deduplication, the user may adapt to his > > specific use-case. > > > > Let me know what you think > > Ayoub > > > > > > > > Le mar. 11 juin 2024 à 20:21, Matthias J. Sax <mj...@apache.org> a > écrit : > > > >> Thanks Sebastien, > >> > >> that's a good point. Thanks for raising it. -- I like your proposal. > >> > >> An alternative would be to have two overloads of `deduplicate()` one w/ > >> and one w/o the "id extractor" parameter. This would be less explicit > >> though. > >> > >> > >> -Matthias > >> > >> On 6/11/24 2:30 AM, Sebastien Viale wrote: > >>> Hi, > >>> > >>> I am really interested in this KIP. > >>> > >>> 106: > >>> I hope I am not talking nonsense, but if you do not deduplicate based > on > >> the key, the input stream has to be repartitioned. > >>> Otherwise, different stream tasks may handle records that need to be > >> deduplicated, and thus duplicates will not be detected. > >>> > >>> This is why I would have created two different methods, as is done for > >> GroupBy: > >>> > >>> deduplicateByKey(...) > >>> deduplicate(...) > >>> > >>> If deduplicateByKey is used, the input stream does not need to be > >> repartitioned. > >>> > >>> thanks > >>> > >>> Sébastien > >>> ________________________________ > >>> De : Matthias J. Sax <mj...@apache.org> > >>> Envoyé : mardi 11 juin 2024 01:54 > >>> À : dev@kafka.apache.org <dev@kafka.apache.org> > >>> Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication processor in > >> kafka-streams > >>> > >>> Warning External sender Do not click on any links or open any > >> attachments unless you trust the sender and know the content is safe. > >>> > >>> Thanks for the update Ayoub. > >>> > >>> > >>> 101: you say: > >>> > >>>> But I am not sure if we don't want to have them for this processor ? > >>> > >>> What is your reasoning to move off the established pattern? Would be > >>> good to understand, why `Deduplicated` class needs a different > >>> "structure" compared to existing classes. > >>> > >>> > >>> > >>> 102: Creating iterators is very expensive. For other work, we actually > >>> hit 100x (?) throughput degradation by creating an (for most cases > >>> empty) iterator for every input record, and needed to find other ways > to > >>> avoid creating an iterator per record. It really kills performance. > >>> > >>> I see the point about data expiration. We could experiment with > >>> punctuation to expire old data, or add a second "time-ordered store" > >>> (which we already have at hand) which acts as an index into the main > >>> store. -- Another possibility would be to add a new version of > segmented > >>> store with a different key-layout (ie, just store the plain key). I > >>> think with some refactoring, we might be able to re-use a lot of > >>> existing code. > >>> > >>> > >>> > >>> 104: > >>> > >>>> This gets me wondering if this is a limitation of stateful processors > >>>> in ALOS. For example a windowed aggregation with `on_window_close` > >>>> emit strategy may have the same limitation today (we receive a record, > >>>> we update its aggregation result in the store, then crash before > >> committing, > >>>> then the record will be again reconsidered for aggregation). Is this > >>>> correct ? > >>> > >>> Yes, this is correct, but it does not violate ALOS, because we did not > >>> lose the input record -- of course, the aggregation would contain the > >>> input record twice (eg, over count), but this is ok under ALOS. > >>> Unfortunately, for de-duplication this pattern breaks, because > >>> de-duplication operator does a different "aggregation logic" depending > >>> on its state (emit if no key found, but not emit if key found). For > >>> counting as an example, we increment the count and emit unconditionally > >>> though. > >>> > >>> > >>>> As a workaround, I think storing the record's offset inside the > >>>> store's value can tell us whether the record has been already seen or > >> not. > >>>> If we receive a record whose deduplication id exists in the store > >>>> and the entry in the store has the same offset, it means the record > >>>> is processed twice and we can go ahead and forward it. If the offset > >>>> is different, it means it's a duplicate record, so we ignore it. > >>> > >>> Great idea. This might work... If we store the input record offset, we > >>> can actually avoid that the "aggregation logic" changes for the same > >>> input record. -- And yes, with ALOS potentially emitting a duplicate is > >>> the-name-of-the-game, so no concerns on this part from my side. > >>> > >>> > >>> > >>> 105: picking the first offset with smallest ts sound good to me. The > KIP > >>> should be explicit about it. But as discussed above, it might be > >>> simplest to not really have a window lookup, but just a plain > key-lookup > >>> and drop if the key exists in the store? -- For late records, it might > >>> imply that they are not de-duplicated, but this is also the case for > >>> in-order records if they are further apart than the de-duplication > >>> window size, right? Thus I would believe this is "more natural" > compared > >>> to discarding late records pro-actively, which would lead to missing > >>> result records? > >>> > >>> We could also make this configurable if we are not sure what users > >>> really need -- or add such a configuration later in case the semantics > >>> we pick don't work for some users. > >>> > >>> Another line of thinking, that did serve us well in the past: in doubt > >>> keep a record -- users can add operators to drop record (in case they > >>> don't want to keep it), but if we drop a record, users have no way to > >>> resurrect it (thus, building a workaround to change semantica is > >>> possible for users if we default to keep records, but not the other way > >>> around). > >>> > >>> Would be good to get input from the broader community on this question > >>> thought. In the end, it must be a use-case driven decision? > >>> > >>> > >>> > >>> -Matthias > >>> > >>> This email was screened for spam and malicious content but exercise > >> caution anyway. > >>> > >>> > >>> > >>> > >>> > >>> On 6/6/24 5:02 AM, Ayoub Omari wrote: > >>>> Hi Matthias, > >>>> > >>>> Thank you for your review ! > >>>> > >>>> 100. > >>>> I agree. I changed the name of the parameter to "idSelector". > >>>> Because this id may be computed, It is better to call it "id" rather > >> than > >>>> field or attribute. > >>>> > >>>> 101. > >>>> The reason I added the methods `keySerde()` and `valueSerde()` was to > >>>> have the same capabilities as other serde classes (such as Grouped > >>>> or Joined). As a Kafka-streams user, I usually use `with(keySerde, > >>>> valueSerde)` > >>>> as you suggested. But I am not sure if we don't want to have them for > >> this > >>>> processor ? > >>>> > >>>> 102. > >>>> That's a good point ! Because we know that the window store will > contain > >>>> at most one instance of a given key, I am not sure how the range fetch > >>>> on WindowStore compares to a KeyValueStore `get()` in this case. > >>>> Wondering if the fact that the record's key is the prefix of the > >> underlying > >>>> keyValueStore's key ("<dataKey,ts>") may provide comparable > performance > >>>> to the random access of KeyValueStore ? Of course, the WindowStore > >> fetch() > >>>> would be less efficient because it may fetch from more than 1 segment, > >> and > >>>> because of some iterator overhead. > >>>> > >>>> The purpose of using a WindowStore is to automatically purge old data. > >>>> For example, deduplicating a topic written at least once wouldn't > >> require > >>>> keeping a large history. This is not the case of using a KeyValueStore > >>>> which would require scanning regularly to remove expired records. > >>>> That might cause a sudden increase of latency whenever the cleanup > >>>> is triggered. > >>>> > >>>> It would be good to hear from anyone who has done some analysis > >>>> on RocksDB's range fetch. > >>>> > >>>> 103. > >>>> Sure, I can update it once we agree on underlying semantics. > >>>> > >>>> 104. > >>>> Another good point ! > >>>> > >>>>> In the end, de-duplication does only make sense when EOS is used > >>>> > >>>> I agree with that. And for me, the use case of deduplicating a topic > >>>> written ALOS inside an EOS application might be the top 1 use case > >>>> of deduplication. > >>>> > >>>>> all downstream processing happens, `context.forward()` returns > >>>>> and we update the state store, we could now crash w/o committing > >> offsets > >>>> > >>>> This gets me wondering if this is a limitation of stateful processors > >>>> in ALOS. For example a windowed aggregation with `on_window_close` > >>>> emit strategy may have the same limitation today (we receive a record, > >>>> we update its aggregation result in the store, then crash before > >> committing, > >>>> then the record will be again reconsidered for aggregation). Is this > >>>> correct ? > >>>> > >>>> As a workaround, I think storing the record's offset inside the > >>>> store's value can tell us whether the record has been already seen or > >> not. > >>>> If we receive a record whose deduplication id exists in the store > >>>> and the entry in the store has the same offset, it means the record > >>>> is processed twice and we can go ahead and forward it. If the offset > >>>> is different, it means it's a duplicate record, so we ignore it. > >>>> > >>>> As you said, we don't have any guarantee whether the initial record > was > >>>> forwarded or not in case of a crash before commit. In this solution > >>>> we would forward the record twice, which is against deduplication. > >>>> But, this is still an ALOS application, so it has the same semantics > >>>> as any other such application. With this, I am not sure we can > >>>> have "strict" deduplication for ALOS applications. > >>>> > >>>> 105. > >>>> For me, if there are two duplicate records, it means they are > >>>> the same in the application's point of view, so it can choose > >>>> either one. Thus, I would go with forwarding the record with > >>>> the least offset. > >>>> > >>>>> Would it not be desired to drop all duplicates independent > >>>>> of their ts, as long as we find a record in the store? > >>>> > >>>> This is actually related to the (suggested) windowed nature > >>>> of deduplication. As in 102. we don't want to do a "forever" > >>>> deduplication, which may be impossible for huge workloads > >>>> where all records should be kept in the store. Hence, the fetch > >>>> of timestamp between [ts-deduplicationInterval, > >> ts+deduplicationInterval] > >>>> > >>>> About late records, this is again due to the windowed nature. > >>>> Because the store won't save those late (i.e. expired) records, > >>>> we have two options. Either, we do not apply deduplication > >>>> on them, thus the deduplication doesn't work on late records, > >>>> or we discard them (which is the option I suggest). > >>>> In the second case, It would be up to the user to choose > >>>> any deduplicationInterval that may sufficiently cover all his late > data. > >>>> What do you think ? > >>>> > >>>> Thanks, > >>>> Ayoub > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> Le mar. 4 juin 2024 à 23:58, Matthias J. Sax <mj...@apache.org> a > >> écrit : > >>>> > >>>>> Ayoub, > >>>>> > >>>>> thanks for resurrecting this KIP. I think a built-in de-duplication > >>>>> operator will be very useful. > >>>>> > >>>>> > >>>>> Couple of questions: > >>>>> > >>>>> > >>>>> > >>>>> 100: `deduplicationKeySelector` > >>>>> > >>>>> Is this the best name? It might indicate that we select a "key" what > is > >>>>> an overloaded term... Maybe we could use `Field` or `Id` or > `Attribute` > >>>>> instead of `Key` in the name? Just brainstorming. If we think `Key` > is > >>>>> the best word, I am also ok with it. > >>>>> > >>>>> > >>>>> > >>>>> 101: `Deduplicated` class > >>>>> > >>>>> You propose to add static methods `keySerde()` and `valueSerde()` -- > in > >>>>> other config classes, we use only `with(keySerde, valueSerde)` as we > >> try > >>>>> to use the "builder" pattern, and avoid too many overloads. I would > >>>>> prefer to omit both methods you suggest and just use a single `with` > >> for > >>>>> both serdes. > >>>>> > >>>>> Similarly, I thing we don't want to add `with(...)` which takes all > >>>>> parameters at once (which should only be 3 parameters, not 4 as it's > >>>>> currently in the KIP)? > >>>>> > >>>>> > >>>>> > >>>>> 102: Usage of `WindowedStore`: > >>>>> > >>>>> Would this be efficient? The physical byte layout it "<dataKey,ts>" > for > >>>>> the store key, so it would be difficult to do an efficient lookup > for a > >>>>> given "de-duplication key" to discard duplicates, as we don't know > the > >>>>> timestamp of the first record with the same "de-duplication key". > >>>>> > >>>>> This boils down to the actual de-duplication logic (some more > comments > >>>>> below), but what you propose seems to require expensive range-scans > >> what > >>>>> could be cost prohibitive in practice. I think we need to find a way > to > >>>>> use efficient key-point-lookups to make this work. > >>>>> > >>>>> > >>>>> > >>>>> 103: "Processing logic": > >>>>> > >>>>> Might need some updates (Cf 102 comment). I am not sure if I fully > >>>>> understand the logic: cf 105 below. > >>>>> > >>>>> > >>>>> > >>>>> 104: > >>>>> > >>>>>> If no entries found → forward the record + save the record in the > >> store > >>>>> > >>>>> This part is critical, and we should discuss in detail. In the end, > >>>>> de-duplication does only make sense when EOS is used, and we might > want > >>>>> to call this out (eg, on the JavaDocs)? But if used with ALOS, it's > >> very > >>>>> difficult to ensure that we never lose data... Your proposal to > >>>>> first-forward goes into the right direction, but does not really > solve > >>>>> the problem entirely: > >>>>> > >>>>> Even if we forward the message first, all downstream processing > >> happens, > >>>>> `context.forward()` returns and we update the state store, we could > now > >>>>> crash w/o committing offsets. For this case, we have no guarantee > that > >>>>> the result records where published (as we did not flush the producer > >>>>> yet), but when re-reading from the input topic, we would find the > >> record > >>>>> in the store and incorrectly drop as duplicate... > >>>>> > >>>>> I think the only solution to make this work would be to use TX-state > >>>>> stores in combination with ALOS as proposed via KIP-892? > >>>>> > >>>>> Using an in-memory store won't help much either? The producer could > >> have > >>>>> send the write into the changelog topic, but not into the result > topic, > >>>>> and thus we could still not guarantee ALOS...? > >>>>> > >>>>> How do we want to go about this? We could also say, this new operator > >>>>> only works with EOS. Would this be too restrictive? -- At lest for > >> know, > >>>>> until KIP-892 lands, and we could relax it? > >>>>> > >>>>> > >>>>> > >>>>> 105: "How to detect late records" > >>>>> > >>>>> In the end, it seems to boil down to determine which of the records > to > >>>>> forward and which record to drop, for (1) the regular case and (2) > the > >>>>> out-of-order data case. > >>>>> > >>>>> Regular case (no out-of-order data): For this case, offset and ts > order > >>>>> is the same, and we can forward the first record we get. All later > >>>>> record within "de-duplication period" with the same "de-duplication > >> key" > >>>>> would be dropped. If a record with the same "de-duplication key" > >> arrives > >>>>> after "de-duplication period" passed, we cannot drop it any longer, > but > >>>>> would still forward it, as by the contract of the operator / > >>>>> de-duplication period. > >>>>> > >>>>> For the out-of-order case: The first question we need to answer is, > do > >>>>> we want to forward the record with the smallest offset or the record > >>>>> with the smallest ts? Logically, forwarding with the smallest ts > might > >>>>> be "more correct", however, it implies we could only forward it after > >>>>> "de-duplication period" passed, what might be undesired latency? > Would > >>>>> this be desired/acceptable? > >>>>> > >>>>> In contrast, if we forward record with the smallest offset (this is > >> what > >>>>> you seem to propose) we don't have a latency issue, but of course the > >>>>> question what records to drop is more tricky to answer: it seems you > >>>>> propose to compare the time difference of the stored record to the > >>>>> current record, but I am wondering why? Would it not be desired to > drop > >>>>> all duplicates independent of their ts, as long as we find a record > in > >>>>> the store? Would be good to get some more motivation and tradeoffs > >>>>> discussed about the different strategies we could use. > >>>>> > >>>>> You also propose to drop _any_ late record: I am also not sure if > >> that's > >>>>> desired? Could this not lead to data loss? Assume we get a late > record, > >>>>> but in fact there was never a duplicate? Why would we want to drop > it? > >>>>> If there is a late record which is indeed a duplicate, but we purged > >> the > >>>>> original record from the store already, it seems to be the same case > as > >>>>> for the "no out-of-order case": after we purged we cannot > de-duplicate > >>>>> and thus it's a regular case we can just accept? > >>>>> > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On 5/29/24 4:58 AM, Ayoub Omari wrote: > >>>>>> Hi everyone, > >>>>>> > >>>>>> I've just made a (small) change to this KIP about an implementation > >>>>> detail. > >>>>>> Please let me know your thoughts. > >>>>>> > >>>>>> Thank you, > >>>>>> Ayoub > >>>>>> > >>>>>> Le lun. 20 mai 2024 à 21:13, Ayoub <ayoubomar...@gmail.com> a > écrit : > >>>>>> > >>>>>>> Hello, > >>>>>>> > >>>>>>> Following a discussion on community slack channel, I would like to > >>>>> revive > >>>>>>> the discussion on the KIP-655, which is about adding a > deduplication > >>>>>>> processor in kafka-streams. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > > >>> > >>>>>>> > >>>>>>> Even though the motivation is not quite the same as the initial > one, > >> I > >>>>>>> updated the KIP rather than creating a new one, as I believe the > end > >>>>> goal > >>>>>>> is the same. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Ayoub > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >> > > >