Hey everyone!

Thank you for participating.

The KIP-813 vote has passed with:

binding +1s (John, Matthias, Bill)
non-binding +1s (Daan, Federico)


From: John Roesler <vvcep...@apache.org>
Date: Friday, 1 April 2022 at 15:54
To: dev@kafka.apache.org <dev@kafka.apache.org>
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
Thanks for the replies, Daan,

That all sounds good to me. I think standbys will probably come naturally, but 
we should make sure the implementation includes an integration test to make 
sure. Anyway, I just wanted to make sure we were on the same page.

Thanks again,

On Fri, Apr 1, 2022, at 08:16, Daan Gertis wrote:
> Hey John,
>   *   1. Am I right I’m thinking that there’s no way to enforce the
> stores are actually read-only, right? It seems like the StoreBuilder
> interface is too generic for that. If that’s true, I think it’s fine,
> but we should be sure the JavaDoc clearly states that other processors
> must not write into these stores (except for the one that feeds it).
> Yeah I couldn’t really find a way to limit it easily. We might be able
> to throw unsupported exceptions by wrapping the statestore, but that
> seems kind of brittle to do and feels a bit like a hack.
> Also, the function name clearly states it should be considered readonly.
>   *    2. Are you planning for these stores to get standbys as well? I
> would think so, otherwise the desired purpose of standbys (eliminating
> restoration latency during failover) would not be served.
> Yeah I think standbys should be applicable here as well. But we get
> that by implementing these readonly statestores as regular ones right?
> Cheers,
> D.
> From: John Roesler <vvcep...@apache.org>
> Date: Friday, 1 April 2022 at 04:01
> To: dev@kafka.apache.org <dev@kafka.apache.org>
> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
> Hi Daan,
> Thanks for the KIP!
> I just got caught up on the discussion. I just have a some small
> questions, and then I will be ready to vote.
> 1. Am I right I’m thinking that there’s no way to enforce the stores
> are actually read-only, right? It seems like the StoreBuilder interface
> is too generic for that. If that’s true, I think it’s fine, but we
> should be sure the JavaDoc clearly states that other processors must
> not write into these stores (except for the one that feeds it).
>  2. Are you planning for these stores to get standbys as well? I would
> think so, otherwise the desired purpose of standbys (eliminating
> restoration latency during failover) would not be served.
> Thanks,
> John
> On Mon, Mar 7, 2022, at 13:13, Matthias J. Sax wrote:
>> Thanks for updating the KIP. LGTM.
>> I think we can start a vote.
>>>  I think this might provide issues if your processor is doing a projection 
>>> of the data.
>> This is correct. It's a know issue:
>> https://issues.apache.org/jira/browse/KAFKA-7663
>> Global-stores/KTables are designed to put the data into the store
>> _unmodified_.
>> -Matthias
>> On 2/28/22 5:05 AM, Daan Gertis wrote:
>>> Updated the KIP to be more aligned with global state store function names.
>>> If I remember correctly during restore the processor will not be used 
>>> right? I think this might provide issues if your processor is doing a 
>>> projection of the data. Either way, I would not add that into this KIP 
>>> since it is a specific use-case pattern.
>>> Unless there is anything more to add or change, I would propose moving to a 
>>> vote?
>>> Cheers!
>>> D.
>>> From: Matthias J. Sax <mj...@apache.org>
>>> Date: Friday, 18 February 2022 at 03:29
>>> To: dev@kafka.apache.org <dev@kafka.apache.org>
>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>> Thanks for updating the KIP!
>>> I am wondering if we would need two overloads of `addReadOnlyStateStore`
>>> one w/ and one w/o `TimestampExtractor` argument to effectively make it
>>> an "optional" parameter?
>>> Also wondering if we need to pass in a `String sourceName` and `String
>>> processorName` parameters (similar to `addGlobalStore()`?) instead if
>>> re-using the store name as currently proposed? -- In general I don't
>>> have a strong opinion either way, but it seems to introduce some API
>>> inconsistency if we don't follow the `addGlobalStore()` pattern?
>>>> Another thing we were confronted with was the restoring of state when the 
>>>> actual local storage is gone. For example, we host on K8s with ephemeral 
>>>> pods, so there is no persisted storage between pod restarts. However, the 
>>>> consumer group will be already been at the latest offset, preventing from 
>>>> previous data to be restored within the new pod’s statestore.
>>> We have already code in-place in the runtime to do the right thing for
>>> this case (ie, via DSL source-table changelog optimization). We can
>>> re-use this part. It's nothing we need to discuss on the KIP, but we can
>>> discuss on the PR later.
>>> -Matthias
>>> On 2/17/22 10:09 AM, Guozhang Wang wrote:
>>>> Hi Daan,
>>>> I think for the read-only state stores you'd need ot slightly augment the
>>>> checkpointing logic so that it would still write the checkpointed offsets
>>>> while restoring from the changelogs.
>>>> Guozhang
>>>> On Thu, Feb 17, 2022 at 7:02 AM Daan Gertis <dger...@korfinancial.com>
>>>> wrote:
>>>>>> Could you add more details about the signature of
>>>>>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>>>>>> overloads taking different parameters? The KIP only contains some verbal
>>>>>> description on the "Implementation Plan" section, that is hard to find
>>>>>> and hard to read.
>>>>>> The KIP mentions a `ProcessorProvider` -- do you mean
>>>>> `ProcessorSupplier`?
>>>>>> About timestamp synchronization: why do you propose to disable timestamp
>>>>>> synchronization (similar to global state stores)? It seems to be an
>>>>>> unnecessary limitation? -- Given that we could re-use the new method for
>>>>>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>>>>>> timestamp synchronization enabled seems to be important?
>>>>> Yup, will do these updates. I’ll overload the addReadOnlyStateStore to
>>>>> have allow for timestamp synchronization.
>>>>> Another thing we were confronted with was the restoring of state when the
>>>>> actual local storage is gone. For example, we host on K8s with ephemeral
>>>>> pods, so there is no persisted storage between pod restarts. However, the
>>>>> consumer group will be already been at the latest offset, preventing from
>>>>> previous data to be restored within the new pod’s statestore.
>>>>> If I remember correctly, there was some checkpoint logic available when
>>>>> restoring, but we are bypassing that since logging is disabled on the
>>>>> statestore, no?
>>>>> As always, thanks for your insights.
>>>>> Cheers,
>>>>> D.
>>>>> From: Matthias J. Sax <mj...@apache.org>
>>>>> Date: Wednesday, 16 February 2022 at 02:09
>>>>> To: dev@kafka.apache.org <dev@kafka.apache.org>
>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>>> Thanks for updating the KIP.
>>>>> Could you add more details about the signature of
>>>>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>>>>> overloads taking different parameters? The KIP only contains some verbal
>>>>> description on the "Implementation Plan" section, that is hard to find
>>>>> and hard to read.
>>>>> The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`?
>>>>> About timestamp synchronization: why do you propose to disable timestamp
>>>>> synchronization (similar to global state stores)? It seems to be an
>>>>> unnecessary limitation? -- Given that we could re-use the new method for
>>>>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>>>>> timestamp synchronization enabled seems to be important?
>>>>> -Matthias
>>>>> On 2/8/22 11:01 PM, Guozhang Wang wrote:
>>>>>> Daan,
>>>>>> Thanks for the replies, those make sense to me.
>>>>>> On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis <dger...@korfinancial.com>
>>>>> wrote:
>>>>>>> I just updated the KIP to reflect the things discussed in this thread.
>>>>>>> As for your questions Guozhang:
>>>>>>>> 1) How do we handle if the num.partitions of app A's store changelog is
>>>>>>>> different from the num.tasks of app B's sub-topology with that
>>>>> read-only
>>>>>>>> store? Or are we going to let each task of B keep a whole copy of the
>>>>>>> store
>>>>>>>> of A by reading all of its changelog partitions, like global stores?
>>>>>>> Good question. Both need to be co-partitioned to have the data
>>>>> available.
>>>>>>> Another option would be to use IQ to make the request, but that seems
>>>>> far
>>>>>>> from ideal.
>>>>>>>> 2) Are we trying to synchronize the store updates from the changelog to
>>>>>>> app
>>>>>>>> B's processing timelines, or just like what we do for global stores
>>>>> that
>>>>>>> we
>>>>>>>> just update the read-only stores async?
>>>>>>> Pretty much the same as we do for global stores.
>>>>>>>> 3) If the answer to both of the above questions are the latter, then
>>>>>>> what's
>>>>>>>> the main difference of adding a read-only store v.s. adding a global
>>>>>>> store?
>>>>>>> I think because of the first answer the behavior differs from global
>>>>>>> stores.
>>>>>>> Makes sense?
>>>>>>> Cheers,
>>>>>>> D.
>>>>>>> From: Matthias J. Sax <mj...@apache.org>
>>>>>>> Date: Thursday, 20 January 2022 at 21:12
>>>>>>> To: dev@kafka.apache.org <dev@kafka.apache.org>
>>>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>>>>>> Any processor that would use that materialized, read-only statestore
>>>>>>> would need to wait for the store to be restored. I can't find a way to
>>>>> make
>>>>>>> that possible since processors can't wait for the statestore to be
>>>>> restored.
>>>>>>> This is built into the runtime already. Nothing to worry about. It's
>>>>>>> part of the regular restore logic -- as long as any store is restoring,
>>>>>>> all processing is blocked.
>>>>>>>> Also, since the statestore would have logging disabled, it means there
>>>>>>> is no initial restoration going on.
>>>>>>> No. When we hookup the input topic as changelog (as the DSL does) we
>>>>>>> restore from the input topic during regular restore phase. The restore
>>>>>>> logic does not even know it's reading from the input topic, but not from
>>>>>>> a "*-changelog" topic).
>>>>>>> Disabling changelogging does only affect the write path (ie,
>>>>>>> `store.put()`) but not the restore path due to the internal "hookup" of
>>>>>>> the input topic inside the restore logic.
>>>>>>> It's not easy to find/understand by reverse engineering I guess, but
>>>>>>> it's there.
>>>>>>> One pointer where the actual hookup happens (might help to dig into it
>>>>>>> more if you want):
>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L353-L356
>>>>>>> -Matthias
>>>>>>> On 1/20/22 10:04 AM, Guozhang Wang wrote:
>>>>>>>> Hello Daan,
>>>>>>>> Thanks for writing the KIP. I just read through it and just my 2c here:
>>>>>>> to
>>>>>>>> me it seems that one of the goal would be to "externalize" the internal
>>>>>>>> changelog topic of an application (say A) so that other consumers can
>>>>>>>> directly read them --- though technically without any auth, anyone
>>>>>>> knowing
>>>>>>>> the topic name would be able to write to it too, conceptually we would
>>>>>>> just
>>>>>>>> assume that app A is the only writer of that topic --- The question I
>>>>> had
>>>>>>>> is how much we want to externalize the topic. For example we can,
>>>>>>>> orthogonally to this KIP, just allow users to pass in a customized
>>>>> topic
>>>>>>>> name when constructing a state store, indicating the application A to
>>>>> use
>>>>>>>> that as the changelog, and since that topic is created outside of A and
>>>>>>> is
>>>>>>>> publicly visible to anyone else on that cluster, anyone --- including
>>>>> any
>>>>>>>> consumers, or streams apps. This is probably most flexible as for
>>>>>>> sharing,
>>>>>>>> but we are even less assured that if application A is the only writer
>>>>> to
>>>>>>>> that external topic unless we have explicit auth for A on that topic.
>>>>>>>> Aside of that, here are a few more detailed comments about the
>>>>>>>> implementation design itself following your current proposal:
>>>>>>>> 1) How do we handle if the num.partitions of app A's store changelog is
>>>>>>>> different from the num.tasks of app B's sub-topology with that
>>>>> read-only
>>>>>>>> store? Or are we going to let each task of B keep a whole copy of the
>>>>>>> store
>>>>>>>> of A by reading all of its changelog partitions, like global stores?
>>>>>>>> 2) Are we trying to synchronize the store updates from the changelog to
>>>>>>> app
>>>>>>>> B's processing timelines, or just like what we do for global stores
>>>>> that
>>>>>>> we
>>>>>>>> just update the read-only stores async?
>>>>>>>> 3) If the answer to both of the above questions are the latter, then
>>>>>>> what's
>>>>>>>> the main difference of adding a read-only store v.s. adding a global
>>>>>>> store?
>>>>>>>> Guozhang
>>>>>>>> On Thu, Jan 20, 2022 at 6:27 AM Daan Gertis <dger...@korfinancial.com>
>>>>>>>> wrote:
>>>>>>>>> Hey Matthias,
>>>>>>>>> Thank you for that feedback, certainly some things to think about. Let
>>>>>>> me
>>>>>>>>> add my thoughts:
>>>>>>>>> +1 on simplifying the motivation. Was aiming to add more context but I
>>>>>>>>> think you're right, bringing it back to the essence makes more sense.
>>>>>>>>> I also follow the reasoning of not having leader and follower. Makes
>>>>>>> sense
>>>>>>>>> to view it from a single app point of view.
>>>>>>>>> As for the API method and its parameters, I wanted to stay close to
>>>>> the
>>>>>>>>> API for adding a regular statestore, but I can perfectly find myself
>>>>> in
>>>>>>>>> defining an addReadOnlyStateStore() method instead.
>>>>>>>>> I agree the processor approach would be the most flexible one, and
>>>>>>> surely
>>>>>>>>> it allows you to use a processor to base the statestore off an
>>>>> existing
>>>>>>>>> topic. From what I understood from the codebase, there might be a
>>>>>>> problem
>>>>>>>>> when using that statestore. Any processor that would use that
>>>>>>> materialized,
>>>>>>>>> read-only statestore would need to wait for the store to be restored.
>>>>> I
>>>>>>>>> can't find a way to make that possible since processors can't wait for
>>>>>>> the
>>>>>>>>> statestore to be restored. Also, since the statestore would have
>>>>> logging
>>>>>>>>> disabled, it means there is no initial restoration going on. As you
>>>>>>> wrote,
>>>>>>>>> the DSL is already doing this, so I'm pretty sure I'm missing
>>>>> something,
>>>>>>>>> just unable to find what exactly.
>>>>>>>>> I will rewrite the parts in the KIP to make processor-based the
>>>>>>> preferred
>>>>>>>>> choice, along with the changes to the motivation etc. Only thing to
>>>>>>> figure
>>>>>>>>> out is that restoring behavior to be sure processors of the readonly
>>>>>>>>> statestore aren't working with stale data.
>>>>>>>>> D.
>>>>>>>>> -----Original Message-----
>>>>>>>>> From: Matthias J. Sax <mj...@apache.org>
>>>>>>>>> Sent: 19 January 2022 21:31
>>>>>>>>> To: dev@kafka.apache.org
>>>>>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>>>>>>> Daan,
>>>>>>>>> thanks for the KIP. I personally find the motivation section a little
>>>>>>> bit
>>>>>>>>> confusing. If I understand the KIP correctly, you want to read a topic
>>>>>>> into
>>>>>>>>> a state store (ie, materialize it). This is already possible today.
>>>>>>>>> Of course, today a "second" changelog topic would be created. It seems
>>>>>>> the
>>>>>>>>> KIP aims to avoid the additional changelog topic, and to allow to
>>>>> re-use
>>>>>>>>> the original input topic (this optimization is already available for
>>>>> the
>>>>>>>>> DSL, but not for the PAPI).
>>>>>>>>> If my observation is correct, we can simplify the motivation
>>>>> accordingly
>>>>>>>>> (the fact that you want to use this feature to share state across
>>>>>>> different
>>>>>>>>> applications more efficiently seems to be secondary and we could omit
>>>>> it
>>>>>>>>> IMHO to keep the motivation focused).
>>>>>>>>> As a result, we also don't need to concept of "leader" and "follower".
>>>>>>>>> In the end, Kafka Streams cannot reason/enforce any usage patterns
>>>>>>> across
>>>>>>>>> different apps, but we can only guarantee stuff within a single
>>>>>>> application
>>>>>>>>> (ie, don't create a changelog but reuse an input topic as changelog).
>>>>> It
>>>>>>>>> would simplify the KIP if we remove these parts.
>>>>>>>>> For the API, I am wondering why you propose to pass in
>>>>> `processorNames`?
>>>>>>>>> To me, it seems more reasonable to pass a `ProcessorSupplier` instead
>>>>>>>>> (similar to what we do for `addGlobalStore`)? The provided `Processor`
>>>>>>> must
>>>>>>>>> implement a certain pattern, ie, take each input record an apply it
>>>>>>>>> unmodified to the state store (ie, the Processor will be solely
>>>>>>> responsible
>>>>>>>>> to maintain the state store). We might also need to pass in other
>>>>>>> argument
>>>>>>>>> similar to `addGlobalStore` into this method). (More below.)
>>>>>>>>> If other processors need to read the state store, they can be
>>>>> connected
>>>>>>> to
>>>>>>>>> it explicitly via `connectProcessorAndStateStores()`? I guess a hybrid
>>>>>>>>> approach to keep `processorName` would also be possible, but IMHO all
>>>>>>> those
>>>>>>>>> should only _read_ the state store (but not modify it), to keep a
>>>>> clear
>>>>>>>>> conceptual separation.
>>>>>>>>> About the method name: wondering if we should use a different name to
>>>>> be
>>>>>>>>> more explicit what the method does? Maybe `addReadOnlyStateStore`?
>>>>>>>>> Btw: please omit any code snippets and only put the newly added method
>>>>>>>>> signature in the KIP.
>>>>>>>>> What I don't yet understand is the section "Allow state stores to
>>>>>>>>> continue listening for changes from their changelog". Can you
>>>>> elaborate?
>>>>>>>>> About:
>>>>>>>>>> Since a changelog topic is created with the application id in it’s
>>>>>>> name,
>>>>>>>>> it would allow us to check in the follower if the changelog topic
>>>>> starts
>>>>>>>>> with our application id. If it doesn’t, we are not allowed to send a
>>>>>>> log.
>>>>>>>>> The DSL implements this differently, and just disabled the changelog
>>>>> for
>>>>>>>>> the state store (ie, for the "follower"). We could do the same thing
>>>>>>>>> (either enforcing that the provided `StoreBuilder` has changelogging
>>>>>>>>> disabled, or by just ignoring it and disabled it hard coded).
>>>>>>>>> Ie, overall I would prefer the "source-procssor appraoch" that you put
>>>>>>>>> into rejected alternatives. Note that the problem you call out, namely
>>>>>>>>>> Problem with this approach is the lack of having restoring support
>>>>>>>>> within the state store
>>>>>>>>> does not apply. A restore it absolutely possible and the DSL already
>>>>>>>>> supports it.
>>>>>>>>> Or is your concern with regard to performance? The "source-processor
>>>>>>>>> approach" would have the disadvantage that input data is first
>>>>>>>>> deserialized, fed into the Processor, and than serialized again when
>>>>> put
>>>>>>>>> into the state store. Re-using the state restore code is a good idea
>>>>>>>>> from a performance point of view, but it might require quite some
>>>>>>>>> internal changes (your proposal to "not stop restoring" might not work
>>>>>>>>> as it could trigger quite some undesired side-effects given the
>>>>> current
>>>>>>>>> architecture of Kafka Streams).
>>>>>>>>> -Matthias
>>>>>>>>> On 1/16/22 11:52 PM, Daan Gertis wrote:
>>>>>>>>>> Hey everyone,
>>>>>>>>>> Just created a KIP on sharing statestore state across multiple
>>>>>>>>> applications without duplicating the data on multiple changelog
>>>>> topics.
>>>>>>>>> Have a look and tell me what you think or what to improve. This is my
>>>>>>> first
>>>>>>>>> one, so please be gentle ??
>>>>>>>>>> https://cwiki.apache.org/confluence/x/q53kCw
>>>>>>>>>> Cheers!
>>>>>>>>>> D.

Reply via email to