Hey everyone! Thank you for participating.
The KIP-813 vote has passed with: binding +1s (John, Matthias, Bill) non-binding +1s (Daan, Federico) Cheers, D. From: John Roesler <vvcep...@apache.org> Date: Friday, 1 April 2022 at 15:54 To: dev@kafka.apache.org <dev@kafka.apache.org> Subject: Re: [DISCUSS] KIP-813 Shared State Stores Thanks for the replies, Daan, That all sounds good to me. I think standbys will probably come naturally, but we should make sure the implementation includes an integration test to make sure. Anyway, I just wanted to make sure we were on the same page. Thanks again, John On Fri, Apr 1, 2022, at 08:16, Daan Gertis wrote: > Hey John, > > > * 1. Am I right I’m thinking that there’s no way to enforce the > stores are actually read-only, right? It seems like the StoreBuilder > interface is too generic for that. If that’s true, I think it’s fine, > but we should be sure the JavaDoc clearly states that other processors > must not write into these stores (except for the one that feeds it). > > Yeah I couldn’t really find a way to limit it easily. We might be able > to throw unsupported exceptions by wrapping the statestore, but that > seems kind of brittle to do and feels a bit like a hack. > > Also, the function name clearly states it should be considered readonly. > > > * 2. Are you planning for these stores to get standbys as well? I > would think so, otherwise the desired purpose of standbys (eliminating > restoration latency during failover) would not be served. > > Yeah I think standbys should be applicable here as well. But we get > that by implementing these readonly statestores as regular ones right? > > Cheers, > D. > > > From: John Roesler <vvcep...@apache.org> > Date: Friday, 1 April 2022 at 04:01 > To: dev@kafka.apache.org <dev@kafka.apache.org> > Subject: Re: [DISCUSS] KIP-813 Shared State Stores > Hi Daan, > > Thanks for the KIP! > > I just got caught up on the discussion. I just have a some small > questions, and then I will be ready to vote. > > 1. Am I right I’m thinking that there’s no way to enforce the stores > are actually read-only, right? It seems like the StoreBuilder interface > is too generic for that. If that’s true, I think it’s fine, but we > should be sure the JavaDoc clearly states that other processors must > not write into these stores (except for the one that feeds it). > > 2. Are you planning for these stores to get standbys as well? I would > think so, otherwise the desired purpose of standbys (eliminating > restoration latency during failover) would not be served. > > Thanks, > John > > On Mon, Mar 7, 2022, at 13:13, Matthias J. Sax wrote: >> Thanks for updating the KIP. LGTM. >> >> I think we can start a vote. >> >> >>> I think this might provide issues if your processor is doing a projection >>> of the data. >> >> This is correct. It's a know issue: >> https://issues.apache.org/jira/browse/KAFKA-7663 >> >> Global-stores/KTables are designed to put the data into the store >> _unmodified_. >> >> >> -Matthias >> >> On 2/28/22 5:05 AM, Daan Gertis wrote: >>> Updated the KIP to be more aligned with global state store function names. >>> >>> If I remember correctly during restore the processor will not be used >>> right? I think this might provide issues if your processor is doing a >>> projection of the data. Either way, I would not add that into this KIP >>> since it is a specific use-case pattern. >>> >>> Unless there is anything more to add or change, I would propose moving to a >>> vote? >>> >>> Cheers! >>> D. >>> >>> From: Matthias J. Sax <mj...@apache.org> >>> Date: Friday, 18 February 2022 at 03:29 >>> To: dev@kafka.apache.org <dev@kafka.apache.org> >>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores >>> Thanks for updating the KIP! >>> >>> I am wondering if we would need two overloads of `addReadOnlyStateStore` >>> one w/ and one w/o `TimestampExtractor` argument to effectively make it >>> an "optional" parameter? >>> >>> Also wondering if we need to pass in a `String sourceName` and `String >>> processorName` parameters (similar to `addGlobalStore()`?) instead if >>> re-using the store name as currently proposed? -- In general I don't >>> have a strong opinion either way, but it seems to introduce some API >>> inconsistency if we don't follow the `addGlobalStore()` pattern? >>> >>> >>>> Another thing we were confronted with was the restoring of state when the >>>> actual local storage is gone. For example, we host on K8s with ephemeral >>>> pods, so there is no persisted storage between pod restarts. However, the >>>> consumer group will be already been at the latest offset, preventing from >>>> previous data to be restored within the new pod’s statestore. >>> >>> We have already code in-place in the runtime to do the right thing for >>> this case (ie, via DSL source-table changelog optimization). We can >>> re-use this part. It's nothing we need to discuss on the KIP, but we can >>> discuss on the PR later. >>> >>> >>> -Matthias >>> >>> >>> On 2/17/22 10:09 AM, Guozhang Wang wrote: >>>> Hi Daan, >>>> >>>> I think for the read-only state stores you'd need ot slightly augment the >>>> checkpointing logic so that it would still write the checkpointed offsets >>>> while restoring from the changelogs. >>>> >>>> >>>> Guozhang >>>> >>>> On Thu, Feb 17, 2022 at 7:02 AM Daan Gertis <dger...@korfinancial.com> >>>> wrote: >>>> >>>>>> Could you add more details about the signature of >>>>>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any >>>>>> overloads taking different parameters? The KIP only contains some verbal >>>>>> description on the "Implementation Plan" section, that is hard to find >>>>>> and hard to read. >>>>>> >>>>>> The KIP mentions a `ProcessorProvider` -- do you mean >>>>> `ProcessorSupplier`? >>>>>> >>>>>> About timestamp synchronization: why do you propose to disable timestamp >>>>>> synchronization (similar to global state stores)? It seems to be an >>>>>> unnecessary limitation? -- Given that we could re-use the new method for >>>>>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having >>>>>> timestamp synchronization enabled seems to be important? >>>>> >>>>> Yup, will do these updates. I’ll overload the addReadOnlyStateStore to >>>>> have allow for timestamp synchronization. >>>>> >>>>> Another thing we were confronted with was the restoring of state when the >>>>> actual local storage is gone. For example, we host on K8s with ephemeral >>>>> pods, so there is no persisted storage between pod restarts. However, the >>>>> consumer group will be already been at the latest offset, preventing from >>>>> previous data to be restored within the new pod’s statestore. >>>>> >>>>> If I remember correctly, there was some checkpoint logic available when >>>>> restoring, but we are bypassing that since logging is disabled on the >>>>> statestore, no? >>>>> >>>>> As always, thanks for your insights. >>>>> >>>>> Cheers, >>>>> D. >>>>> >>>>> >>>>> From: Matthias J. Sax <mj...@apache.org> >>>>> Date: Wednesday, 16 February 2022 at 02:09 >>>>> To: dev@kafka.apache.org <dev@kafka.apache.org> >>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores >>>>> Thanks for updating the KIP. >>>>> >>>>> Could you add more details about the signature of >>>>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any >>>>> overloads taking different parameters? The KIP only contains some verbal >>>>> description on the "Implementation Plan" section, that is hard to find >>>>> and hard to read. >>>>> >>>>> The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`? >>>>> >>>>> About timestamp synchronization: why do you propose to disable timestamp >>>>> synchronization (similar to global state stores)? It seems to be an >>>>> unnecessary limitation? -- Given that we could re-use the new method for >>>>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having >>>>> timestamp synchronization enabled seems to be important? >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 2/8/22 11:01 PM, Guozhang Wang wrote: >>>>>> Daan, >>>>>> >>>>>> Thanks for the replies, those make sense to me. >>>>>> >>>>>> On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis <dger...@korfinancial.com> >>>>> wrote: >>>>>> >>>>>>> I just updated the KIP to reflect the things discussed in this thread. >>>>>>> >>>>>>> As for your questions Guozhang: >>>>>>> >>>>>>>> 1) How do we handle if the num.partitions of app A's store changelog is >>>>>>>> different from the num.tasks of app B's sub-topology with that >>>>> read-only >>>>>>>> store? Or are we going to let each task of B keep a whole copy of the >>>>>>> store >>>>>>>> of A by reading all of its changelog partitions, like global stores? >>>>>>> >>>>>>> Good question. Both need to be co-partitioned to have the data >>>>> available. >>>>>>> Another option would be to use IQ to make the request, but that seems >>>>> far >>>>>>> from ideal. >>>>>>> >>>>>>>> 2) Are we trying to synchronize the store updates from the changelog to >>>>>>> app >>>>>>>> B's processing timelines, or just like what we do for global stores >>>>> that >>>>>>> we >>>>>>>> just update the read-only stores async? >>>>>>> >>>>>>> Pretty much the same as we do for global stores. >>>>>>> >>>>>>>> 3) If the answer to both of the above questions are the latter, then >>>>>>> what's >>>>>>>> the main difference of adding a read-only store v.s. adding a global >>>>>>> store? >>>>>>> >>>>>>> I think because of the first answer the behavior differs from global >>>>>>> stores. >>>>>>> >>>>>>> Makes sense? >>>>>>> >>>>>>> Cheers, >>>>>>> >>>>>>> D. >>>>>>> >>>>>>> From: Matthias J. Sax <mj...@apache.org> >>>>>>> Date: Thursday, 20 January 2022 at 21:12 >>>>>>> To: dev@kafka.apache.org <dev@kafka.apache.org> >>>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores >>>>>>>> Any processor that would use that materialized, read-only statestore >>>>>>> would need to wait for the store to be restored. I can't find a way to >>>>> make >>>>>>> that possible since processors can't wait for the statestore to be >>>>> restored. >>>>>>> >>>>>>> This is built into the runtime already. Nothing to worry about. It's >>>>>>> part of the regular restore logic -- as long as any store is restoring, >>>>>>> all processing is blocked. >>>>>>> >>>>>>>> Also, since the statestore would have logging disabled, it means there >>>>>>> is no initial restoration going on. >>>>>>> >>>>>>> No. When we hookup the input topic as changelog (as the DSL does) we >>>>>>> restore from the input topic during regular restore phase. The restore >>>>>>> logic does not even know it's reading from the input topic, but not from >>>>>>> a "*-changelog" topic). >>>>>>> >>>>>>> Disabling changelogging does only affect the write path (ie, >>>>>>> `store.put()`) but not the restore path due to the internal "hookup" of >>>>>>> the input topic inside the restore logic. >>>>>>> >>>>>>> It's not easy to find/understand by reverse engineering I guess, but >>>>>>> it's there. >>>>>>> >>>>>>> One pointer where the actual hookup happens (might help to dig into it >>>>>>> more if you want): >>>>>>> >>>>>>> >>>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L353-L356 >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> On 1/20/22 10:04 AM, Guozhang Wang wrote: >>>>>>>> Hello Daan, >>>>>>>> >>>>>>>> Thanks for writing the KIP. I just read through it and just my 2c here: >>>>>>> to >>>>>>>> me it seems that one of the goal would be to "externalize" the internal >>>>>>>> changelog topic of an application (say A) so that other consumers can >>>>>>>> directly read them --- though technically without any auth, anyone >>>>>>> knowing >>>>>>>> the topic name would be able to write to it too, conceptually we would >>>>>>> just >>>>>>>> assume that app A is the only writer of that topic --- The question I >>>>> had >>>>>>>> is how much we want to externalize the topic. For example we can, >>>>>>>> orthogonally to this KIP, just allow users to pass in a customized >>>>> topic >>>>>>>> name when constructing a state store, indicating the application A to >>>>> use >>>>>>>> that as the changelog, and since that topic is created outside of A and >>>>>>> is >>>>>>>> publicly visible to anyone else on that cluster, anyone --- including >>>>> any >>>>>>>> consumers, or streams apps. This is probably most flexible as for >>>>>>> sharing, >>>>>>>> but we are even less assured that if application A is the only writer >>>>> to >>>>>>>> that external topic unless we have explicit auth for A on that topic. >>>>>>>> >>>>>>>> Aside of that, here are a few more detailed comments about the >>>>>>>> implementation design itself following your current proposal: >>>>>>>> >>>>>>>> 1) How do we handle if the num.partitions of app A's store changelog is >>>>>>>> different from the num.tasks of app B's sub-topology with that >>>>> read-only >>>>>>>> store? Or are we going to let each task of B keep a whole copy of the >>>>>>> store >>>>>>>> of A by reading all of its changelog partitions, like global stores? >>>>>>>> 2) Are we trying to synchronize the store updates from the changelog to >>>>>>> app >>>>>>>> B's processing timelines, or just like what we do for global stores >>>>> that >>>>>>> we >>>>>>>> just update the read-only stores async? >>>>>>>> 3) If the answer to both of the above questions are the latter, then >>>>>>> what's >>>>>>>> the main difference of adding a read-only store v.s. adding a global >>>>>>> store? >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jan 20, 2022 at 6:27 AM Daan Gertis <dger...@korfinancial.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hey Matthias, >>>>>>>>> >>>>>>>>> Thank you for that feedback, certainly some things to think about. Let >>>>>>> me >>>>>>>>> add my thoughts: >>>>>>>>> >>>>>>>>> +1 on simplifying the motivation. Was aiming to add more context but I >>>>>>>>> think you're right, bringing it back to the essence makes more sense. >>>>>>>>> >>>>>>>>> I also follow the reasoning of not having leader and follower. Makes >>>>>>> sense >>>>>>>>> to view it from a single app point of view. >>>>>>>>> >>>>>>>>> As for the API method and its parameters, I wanted to stay close to >>>>> the >>>>>>>>> API for adding a regular statestore, but I can perfectly find myself >>>>> in >>>>>>>>> defining an addReadOnlyStateStore() method instead. >>>>>>>>> >>>>>>>>> I agree the processor approach would be the most flexible one, and >>>>>>> surely >>>>>>>>> it allows you to use a processor to base the statestore off an >>>>> existing >>>>>>>>> topic. From what I understood from the codebase, there might be a >>>>>>> problem >>>>>>>>> when using that statestore. Any processor that would use that >>>>>>> materialized, >>>>>>>>> read-only statestore would need to wait for the store to be restored. >>>>> I >>>>>>>>> can't find a way to make that possible since processors can't wait for >>>>>>> the >>>>>>>>> statestore to be restored. Also, since the statestore would have >>>>> logging >>>>>>>>> disabled, it means there is no initial restoration going on. As you >>>>>>> wrote, >>>>>>>>> the DSL is already doing this, so I'm pretty sure I'm missing >>>>> something, >>>>>>>>> just unable to find what exactly. >>>>>>>>> >>>>>>>>> I will rewrite the parts in the KIP to make processor-based the >>>>>>> preferred >>>>>>>>> choice, along with the changes to the motivation etc. Only thing to >>>>>>> figure >>>>>>>>> out is that restoring behavior to be sure processors of the readonly >>>>>>>>> statestore aren't working with stale data. >>>>>>>>> >>>>>>>>> D. >>>>>>>>> >>>>>>>>> -----Original Message----- >>>>>>>>> From: Matthias J. Sax <mj...@apache.org> >>>>>>>>> Sent: 19 January 2022 21:31 >>>>>>>>> To: dev@kafka.apache.org >>>>>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores >>>>>>>>> >>>>>>>>> Daan, >>>>>>>>> >>>>>>>>> thanks for the KIP. I personally find the motivation section a little >>>>>>> bit >>>>>>>>> confusing. If I understand the KIP correctly, you want to read a topic >>>>>>> into >>>>>>>>> a state store (ie, materialize it). This is already possible today. >>>>>>>>> >>>>>>>>> Of course, today a "second" changelog topic would be created. It seems >>>>>>> the >>>>>>>>> KIP aims to avoid the additional changelog topic, and to allow to >>>>> re-use >>>>>>>>> the original input topic (this optimization is already available for >>>>> the >>>>>>>>> DSL, but not for the PAPI). >>>>>>>>> >>>>>>>>> If my observation is correct, we can simplify the motivation >>>>> accordingly >>>>>>>>> (the fact that you want to use this feature to share state across >>>>>>> different >>>>>>>>> applications more efficiently seems to be secondary and we could omit >>>>> it >>>>>>>>> IMHO to keep the motivation focused). >>>>>>>>> >>>>>>>>> As a result, we also don't need to concept of "leader" and "follower". >>>>>>>>> In the end, Kafka Streams cannot reason/enforce any usage patterns >>>>>>> across >>>>>>>>> different apps, but we can only guarantee stuff within a single >>>>>>> application >>>>>>>>> (ie, don't create a changelog but reuse an input topic as changelog). >>>>> It >>>>>>>>> would simplify the KIP if we remove these parts. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> For the API, I am wondering why you propose to pass in >>>>> `processorNames`? >>>>>>>>> To me, it seems more reasonable to pass a `ProcessorSupplier` instead >>>>>>>>> (similar to what we do for `addGlobalStore`)? The provided `Processor` >>>>>>> must >>>>>>>>> implement a certain pattern, ie, take each input record an apply it >>>>>>>>> unmodified to the state store (ie, the Processor will be solely >>>>>>> responsible >>>>>>>>> to maintain the state store). We might also need to pass in other >>>>>>> argument >>>>>>>>> similar to `addGlobalStore` into this method). (More below.) >>>>>>>>> >>>>>>>>> If other processors need to read the state store, they can be >>>>> connected >>>>>>> to >>>>>>>>> it explicitly via `connectProcessorAndStateStores()`? I guess a hybrid >>>>>>>>> approach to keep `processorName` would also be possible, but IMHO all >>>>>>> those >>>>>>>>> should only _read_ the state store (but not modify it), to keep a >>>>> clear >>>>>>>>> conceptual separation. >>>>>>>>> >>>>>>>>> About the method name: wondering if we should use a different name to >>>>> be >>>>>>>>> more explicit what the method does? Maybe `addReadOnlyStateStore`? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Btw: please omit any code snippets and only put the newly added method >>>>>>>>> signature in the KIP. >>>>>>>>> >>>>>>>>> What I don't yet understand is the section "Allow state stores to >>>>>>>>> continue listening for changes from their changelog". Can you >>>>> elaborate? >>>>>>>>> >>>>>>>>> About: >>>>>>>>> >>>>>>>>>> Since a changelog topic is created with the application id in it’s >>>>>>> name, >>>>>>>>> it would allow us to check in the follower if the changelog topic >>>>> starts >>>>>>>>> with our application id. If it doesn’t, we are not allowed to send a >>>>>>> log. >>>>>>>>> >>>>>>>>> The DSL implements this differently, and just disabled the changelog >>>>> for >>>>>>>>> the state store (ie, for the "follower"). We could do the same thing >>>>>>>>> (either enforcing that the provided `StoreBuilder` has changelogging >>>>>>>>> disabled, or by just ignoring it and disabled it hard coded). >>>>>>>>> >>>>>>>>> >>>>>>>>> Ie, overall I would prefer the "source-procssor appraoch" that you put >>>>>>>>> into rejected alternatives. Note that the problem you call out, namely >>>>>>>>> >>>>>>>>>> Problem with this approach is the lack of having restoring support >>>>>>>>> within the state store >>>>>>>>> >>>>>>>>> does not apply. A restore it absolutely possible and the DSL already >>>>>>>>> supports it. >>>>>>>>> >>>>>>>>> >>>>>>>>> Or is your concern with regard to performance? The "source-processor >>>>>>>>> approach" would have the disadvantage that input data is first >>>>>>>>> deserialized, fed into the Processor, and than serialized again when >>>>> put >>>>>>>>> into the state store. Re-using the state restore code is a good idea >>>>>>>>> from a performance point of view, but it might require quite some >>>>>>>>> internal changes (your proposal to "not stop restoring" might not work >>>>>>>>> as it could trigger quite some undesired side-effects given the >>>>> current >>>>>>>>> architecture of Kafka Streams). >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 1/16/22 11:52 PM, Daan Gertis wrote: >>>>>>>>>> Hey everyone, >>>>>>>>>> >>>>>>>>>> Just created a KIP on sharing statestore state across multiple >>>>>>>>> applications without duplicating the data on multiple changelog >>>>> topics. >>>>>>>>> Have a look and tell me what you think or what to improve. This is my >>>>>>> first >>>>>>>>> one, so please be gentle ?? >>>>>>>>>> >>>>>>>>>> https://cwiki.apache.org/confluence/x/q53kCw >>>>>>>>>> >>>>>>>>>> Cheers! >>>>>>>>>> D. >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>>