Re: [DISCUSS] KIP-813 Shared State Stores

2022-04-19 Thread Daan Gertis
Hey everyone!

Thank you for participating.

The KIP-813 vote has passed with:

binding +1s (John, Matthias, Bill)
non-binding +1s (Daan, Federico)

Cheers,
D.


From: John Roesler 
Date: Friday, 1 April 2022 at 15:54
To: dev@kafka.apache.org 
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
Thanks for the replies, Daan,

That all sounds good to me. I think standbys will probably come naturally, but 
we should make sure the implementation includes an integration test to make 
sure. Anyway, I just wanted to make sure we were on the same page.

Thanks again,
John

On Fri, Apr 1, 2022, at 08:16, Daan Gertis wrote:
> Hey John,
>
>
>   *   1. Am I right I’m thinking that there’s no way to enforce the
> stores are actually read-only, right? It seems like the StoreBuilder
> interface is too generic for that. If that’s true, I think it’s fine,
> but we should be sure the JavaDoc clearly states that other processors
> must not write into these stores (except for the one that feeds it).
>
> Yeah I couldn’t really find a way to limit it easily. We might be able
> to throw unsupported exceptions by wrapping the statestore, but that
> seems kind of brittle to do and feels a bit like a hack.
>
> Also, the function name clearly states it should be considered readonly.
>
>
>   *2. Are you planning for these stores to get standbys as well? I
> would think so, otherwise the desired purpose of standbys (eliminating
> restoration latency during failover) would not be served.
>
> Yeah I think standbys should be applicable here as well. But we get
> that by implementing these readonly statestores as regular ones right?
>
> Cheers,
> D.
>
>
> From: John Roesler 
> Date: Friday, 1 April 2022 at 04:01
> To: dev@kafka.apache.org 
> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
> Hi Daan,
>
> Thanks for the KIP!
>
> I just got caught up on the discussion. I just have a some small
> questions, and then I will be ready to vote.
>
> 1. Am I right I’m thinking that there’s no way to enforce the stores
> are actually read-only, right? It seems like the StoreBuilder interface
> is too generic for that. If that’s true, I think it’s fine, but we
> should be sure the JavaDoc clearly states that other processors must
> not write into these stores (except for the one that feeds it).
>
>  2. Are you planning for these stores to get standbys as well? I would
> think so, otherwise the desired purpose of standbys (eliminating
> restoration latency during failover) would not be served.
>
> Thanks,
> John
>
> On Mon, Mar 7, 2022, at 13:13, Matthias J. Sax wrote:
>> Thanks for updating the KIP. LGTM.
>>
>> I think we can start a vote.
>>
>>
>>>  I think this might provide issues if your processor is doing a projection 
>>> of the data.
>>
>> This is correct. It's a know issue:
>> https://issues.apache.org/jira/browse/KAFKA-7663
>>
>> Global-stores/KTables are designed to put the data into the store
>> _unmodified_.
>>
>>
>> -Matthias
>>
>> On 2/28/22 5:05 AM, Daan Gertis wrote:
>>> Updated the KIP to be more aligned with global state store function names.
>>>
>>> If I remember correctly during restore the processor will not be used 
>>> right? I think this might provide issues if your processor is doing a 
>>> projection of the data. Either way, I would not add that into this KIP 
>>> since it is a specific use-case pattern.
>>>
>>> Unless there is anything more to add or change, I would propose moving to a 
>>> vote?
>>>
>>> Cheers!
>>> D.
>>>
>>> From: Matthias J. Sax 
>>> Date: Friday, 18 February 2022 at 03:29
>>> To: dev@kafka.apache.org 
>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>> Thanks for updating the KIP!
>>>
>>> I am wondering if we would need two overloads of `addReadOnlyStateStore`
>>> one w/ and one w/o `TimestampExtractor` argument to effectively make it
>>> an "optional" parameter?
>>>
>>> Also wondering if we need to pass in a `String sourceName` and `String
>>> processorName` parameters (similar to `addGlobalStore()`?) instead if
>>> re-using the store name as currently proposed? -- In general I don't
>>> have a strong opinion either way, but it seems to introduce some API
>>> inconsistency if we don't follow the `addGlobalStore()` pattern?
>>>
>>>
>>>> Another thing we were confronted with was the restoring of state when the 
>>>> actual local storage is gone. For example, we host on K8s with ephemeral 
>>>> pods, so there is 

Re: [VOTE] KIP-813 Shared State Stores

2022-04-12 Thread Daan Gertis
Cool! I’ll move forward and start preparing the PR for it.

Cheers!
D.

From: Bill Bejeck 
Date: Tuesday, 5 April 2022 at 18:17
To: dev 
Subject: Re: [VOTE] KIP-813 Shared State Stores
Thanks for the KIP, Daan.

I've caught up on the discussion thread and I've gone over the KIP.  This
seems like a good addition to me.

+1 (binding)

Thanks,
Bill

On Fri, Apr 1, 2022 at 2:13 PM Matthias J. Sax  wrote:

> +1 (binding)
>
>
> On 4/1/22 6:47 AM, John Roesler wrote:
> > Thanks for the KIP, Daan!
> >
> > I’m +1 (binding)
> >
> > -John
> >
> > On Tue, Mar 29, 2022, at 06:01, Daan Gertis wrote:
> >> I would like to start a vote on this one:
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-813%3A+Shareable+State+Stores
> >>
> >> Cheers,
> >> D.
>


Re: [DISCUSS] KIP-813 Shared State Stores

2022-04-01 Thread Daan Gertis
Hey John,


  *   1. Am I right I’m thinking that there’s no way to enforce the stores are 
actually read-only, right? It seems like the StoreBuilder interface is too 
generic for that. If that’s true, I think it’s fine, but we should be sure the 
JavaDoc clearly states that other processors must not write into these stores 
(except for the one that feeds it).

Yeah I couldn’t really find a way to limit it easily. We might be able to throw 
unsupported exceptions by wrapping the statestore, but that seems kind of 
brittle to do and feels a bit like a hack.

Also, the function name clearly states it should be considered readonly.


  *2. Are you planning for these stores to get standbys as well? I would 
think so, otherwise the desired purpose of standbys (eliminating restoration 
latency during failover) would not be served.

Yeah I think standbys should be applicable here as well. But we get that by 
implementing these readonly statestores as regular ones right?

Cheers,
D.


From: John Roesler 
Date: Friday, 1 April 2022 at 04:01
To: dev@kafka.apache.org 
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
Hi Daan,

Thanks for the KIP!

I just got caught up on the discussion. I just have a some small questions, and 
then I will be ready to vote.

1. Am I right I’m thinking that there’s no way to enforce the stores are 
actually read-only, right? It seems like the StoreBuilder interface is too 
generic for that. If that’s true, I think it’s fine, but we should be sure the 
JavaDoc clearly states that other processors must not write into these stores 
(except for the one that feeds it).

 2. Are you planning for these stores to get standbys as well? I would think 
so, otherwise the desired purpose of standbys (eliminating restoration latency 
during failover) would not be served.

Thanks,
John

On Mon, Mar 7, 2022, at 13:13, Matthias J. Sax wrote:
> Thanks for updating the KIP. LGTM.
>
> I think we can start a vote.
>
>
>>  I think this might provide issues if your processor is doing a projection 
>> of the data.
>
> This is correct. It's a know issue:
> https://issues.apache.org/jira/browse/KAFKA-7663
>
> Global-stores/KTables are designed to put the data into the store
> _unmodified_.
>
>
> -Matthias
>
> On 2/28/22 5:05 AM, Daan Gertis wrote:
>> Updated the KIP to be more aligned with global state store function names.
>>
>> If I remember correctly during restore the processor will not be used right? 
>> I think this might provide issues if your processor is doing a projection of 
>> the data. Either way, I would not add that into this KIP since it is a 
>> specific use-case pattern.
>>
>> Unless there is anything more to add or change, I would propose moving to a 
>> vote?
>>
>> Cheers!
>> D.
>>
>> From: Matthias J. Sax 
>> Date: Friday, 18 February 2022 at 03:29
>> To: dev@kafka.apache.org 
>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>> Thanks for updating the KIP!
>>
>> I am wondering if we would need two overloads of `addReadOnlyStateStore`
>> one w/ and one w/o `TimestampExtractor` argument to effectively make it
>> an "optional" parameter?
>>
>> Also wondering if we need to pass in a `String sourceName` and `String
>> processorName` parameters (similar to `addGlobalStore()`?) instead if
>> re-using the store name as currently proposed? -- In general I don't
>> have a strong opinion either way, but it seems to introduce some API
>> inconsistency if we don't follow the `addGlobalStore()` pattern?
>>
>>
>>> Another thing we were confronted with was the restoring of state when the 
>>> actual local storage is gone. For example, we host on K8s with ephemeral 
>>> pods, so there is no persisted storage between pod restarts. However, the 
>>> consumer group will be already been at the latest offset, preventing from 
>>> previous data to be restored within the new pod’s statestore.
>>
>> We have already code in-place in the runtime to do the right thing for
>> this case (ie, via DSL source-table changelog optimization). We can
>> re-use this part. It's nothing we need to discuss on the KIP, but we can
>> discuss on the PR later.
>>
>>
>> -Matthias
>>
>>
>> On 2/17/22 10:09 AM, Guozhang Wang wrote:
>>> Hi Daan,
>>>
>>> I think for the read-only state stores you'd need ot slightly augment the
>>> checkpointing logic so that it would still write the checkpointed offsets
>>> while restoring from the changelogs.
>>>
>>>
>>> Guozhang
>>>
>>> On Thu, Feb 17, 2022 at 7:02 AM Daan Gertis 
>>> wrote:
>>>
>>>>> C

[VOTE] KIP-813 Shared State Stores

2022-03-29 Thread Daan Gertis
I would like to start a vote on this one:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-813%3A+Shareable+State+Stores

Cheers,
D.


Kafka Streams Issue

2022-03-28 Thread Daan Gertis
Hi All,

We are experiencing some weird behaviour with our interactive query service 
implementation.
This is the flow we’ve implemented:


  1.  kafkaStreams.queryMetadataForKey(store, key, serializer) returns for 
activeHost HostInfo{host='localhost', port=8562}, and standbyHosts [] for the 
store and partition where the key would reside. We are not interested in 
standby hosts. Luckily, we have an active host which we can call.
  2.  We make an HTTP call to host localhost:8562, asking for the key there.
  3.  Inside the 8562 host, we retrieve the store by calling 
kafkaStreams.store(parameters), using parameters with staleStores set to false.
  4.  We call kafkaStreams.state().equals(RUNNING) to make sure we’re in the 
RUNNING state.
  5.  Now we call store.get(key) in order to retrieve the key from the store, 
if it has been stored there.
  6.  The get method on our store implementation calls the 
storeProvider.stores(storeName, storeType) method to iterate over all the 
stores available on the host.
  7.  The storeProvider is a WrappingStoreProvider, which calls 
storeProvider.stores(storeQueryParameters) for each 
StreamThreadStateStoreProvider it wraps (just one in our case).
  8.  As the logic inside that stores method finds that the StreamThread is in 
the RUNNING state, it retrieves the tasks based on 
storeQueryParams.staleStoresEnabled() ? streamThread.allTasks().values() : 
streamThread.activeTasks(), which evaluates to false since we set staleStores 
to false in the params.
  9.  To our surprise, the streamThread.activeTasks() method returns an empty 
ArrayList, while the streamThread.allTasks().values() returns one StandbyTask 
for the store we’re looking for.
  10. As there appear to be no active tasks on this host for this store, we 
return the fabled “The state store, " + storeName + ", may have migrated to 
another instance.” InvalidStateStoreException.

This flow is quite tricky as the queryMetadataForKey returns an active host, 
which turns out to only have a standby task once queried.
I have executed the queryMetadataForKey method on the active host as well, once 
before calling kafkaStreams.store in step 3, and another time between step 4 
and 5. Each time the metadata returns the same, the host we’re on at that 
moment is the active host.

Could it be there is a difference between activeHost and activeTask?

For those also on the confluent community slack might recognize this message as 
it has been posted there by our CTO as well.

Cheers,
D.


Re: [DISCUSS] KIP-813 Shared State Stores

2022-02-28 Thread Daan Gertis
Updated the KIP to be more aligned with global state store function names.

If I remember correctly during restore the processor will not be used right? I 
think this might provide issues if your processor is doing a projection of the 
data. Either way, I would not add that into this KIP since it is a specific 
use-case pattern.

Unless there is anything more to add or change, I would propose moving to a 
vote?

Cheers!
D.

From: Matthias J. Sax 
Date: Friday, 18 February 2022 at 03:29
To: dev@kafka.apache.org 
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
Thanks for updating the KIP!

I am wondering if we would need two overloads of `addReadOnlyStateStore`
one w/ and one w/o `TimestampExtractor` argument to effectively make it
an "optional" parameter?

Also wondering if we need to pass in a `String sourceName` and `String
processorName` parameters (similar to `addGlobalStore()`?) instead if
re-using the store name as currently proposed? -- In general I don't
have a strong opinion either way, but it seems to introduce some API
inconsistency if we don't follow the `addGlobalStore()` pattern?


> Another thing we were confronted with was the restoring of state when the 
> actual local storage is gone. For example, we host on K8s with ephemeral 
> pods, so there is no persisted storage between pod restarts. However, the 
> consumer group will be already been at the latest offset, preventing from 
> previous data to be restored within the new pod’s statestore.

We have already code in-place in the runtime to do the right thing for
this case (ie, via DSL source-table changelog optimization). We can
re-use this part. It's nothing we need to discuss on the KIP, but we can
discuss on the PR later.


-Matthias


On 2/17/22 10:09 AM, Guozhang Wang wrote:
> Hi Daan,
>
> I think for the read-only state stores you'd need ot slightly augment the
> checkpointing logic so that it would still write the checkpointed offsets
> while restoring from the changelogs.
>
>
> Guozhang
>
> On Thu, Feb 17, 2022 at 7:02 AM Daan Gertis 
> wrote:
>
>>> Could you add more details about the signature of
>>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>>> overloads taking different parameters? The KIP only contains some verbal
>>> description on the "Implementation Plan" section, that is hard to find
>>> and hard to read.
>>>
>>> The KIP mentions a `ProcessorProvider` -- do you mean
>> `ProcessorSupplier`?
>>>
>>> About timestamp synchronization: why do you propose to disable timestamp
>>> synchronization (similar to global state stores)? It seems to be an
>>> unnecessary limitation? -- Given that we could re-use the new method for
>>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>>> timestamp synchronization enabled seems to be important?
>>
>> Yup, will do these updates. I’ll overload the addReadOnlyStateStore to
>> have allow for timestamp synchronization.
>>
>> Another thing we were confronted with was the restoring of state when the
>> actual local storage is gone. For example, we host on K8s with ephemeral
>> pods, so there is no persisted storage between pod restarts. However, the
>> consumer group will be already been at the latest offset, preventing from
>> previous data to be restored within the new pod’s statestore.
>>
>> If I remember correctly, there was some checkpoint logic available when
>> restoring, but we are bypassing that since logging is disabled on the
>> statestore, no?
>>
>> As always, thanks for your insights.
>>
>> Cheers,
>> D.
>>
>>
>> From: Matthias J. Sax 
>> Date: Wednesday, 16 February 2022 at 02:09
>> To: dev@kafka.apache.org 
>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>> Thanks for updating the KIP.
>>
>> Could you add more details about the signature of
>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>> overloads taking different parameters? The KIP only contains some verbal
>> description on the "Implementation Plan" section, that is hard to find
>> and hard to read.
>>
>> The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`?
>>
>> About timestamp synchronization: why do you propose to disable timestamp
>> synchronization (similar to global state stores)? It seems to be an
>> unnecessary limitation? -- Given that we could re-use the new method for
>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>> timestamp synchronization enabled seems to be important?
>>
>>
>> -Matthias
>>
>>
>> On 2/8/22 

Re: [DISCUSS] KIP-813 Shared State Stores

2022-02-17 Thread Daan Gertis
> Could you add more details about the signature of
> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
> overloads taking different parameters? The KIP only contains some verbal
> description on the "Implementation Plan" section, that is hard to find
> and hard to read.
>
> The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`?
>
> About timestamp synchronization: why do you propose to disable timestamp
> synchronization (similar to global state stores)? It seems to be an
> unnecessary limitation? -- Given that we could re-use the new method for
> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
> timestamp synchronization enabled seems to be important?

Yup, will do these updates. I’ll overload the addReadOnlyStateStore to have 
allow for timestamp synchronization.

Another thing we were confronted with was the restoring of state when the 
actual local storage is gone. For example, we host on K8s with ephemeral pods, 
so there is no persisted storage between pod restarts. However, the consumer 
group will be already been at the latest offset, preventing from previous data 
to be restored within the new pod’s statestore.

If I remember correctly, there was some checkpoint logic available when 
restoring, but we are bypassing that since logging is disabled on the 
statestore, no?

As always, thanks for your insights.

Cheers,
D.


From: Matthias J. Sax 
Date: Wednesday, 16 February 2022 at 02:09
To: dev@kafka.apache.org 
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
Thanks for updating the KIP.

Could you add more details about the signature of
`addReadOnlyStateStore()` -- What parameters does it take? Are there any
overloads taking different parameters? The KIP only contains some verbal
description on the "Implementation Plan" section, that is hard to find
and hard to read.

The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`?

About timestamp synchronization: why do you propose to disable timestamp
synchronization (similar to global state stores)? It seems to be an
unnecessary limitation? -- Given that we could re-use the new method for
source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
timestamp synchronization enabled seems to be important?


-Matthias


On 2/8/22 11:01 PM, Guozhang Wang wrote:
> Daan,
>
> Thanks for the replies, those make sense to me.
>
> On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis  wrote:
>
>> I just updated the KIP to reflect the things discussed in this thread.
>>
>> As for your questions Guozhang:
>>
>>> 1) How do we handle if the num.partitions of app A's store changelog is
>>> different from the num.tasks of app B's sub-topology with that read-only
>>> store? Or are we going to let each task of B keep a whole copy of the
>> store
>>> of A by reading all of its changelog partitions, like global stores?
>>
>> Good question. Both need to be co-partitioned to have the data available.
>> Another option would be to use IQ to make the request, but that seems far
>> from ideal.
>>
>>> 2) Are we trying to synchronize the store updates from the changelog to
>> app
>>> B's processing timelines, or just like what we do for global stores that
>> we
>>> just update the read-only stores async?
>>
>> Pretty much the same as we do for global stores.
>>
>>> 3) If the answer to both of the above questions are the latter, then
>> what's
>>> the main difference of adding a read-only store v.s. adding a global
>> store?
>>
>> I think because of the first answer the behavior differs from global
>> stores.
>>
>> Makes sense?
>>
>> Cheers,
>>
>> D.
>>
>> From: Matthias J. Sax 
>> Date: Thursday, 20 January 2022 at 21:12
>> To: dev@kafka.apache.org 
>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>> Any processor that would use that materialized, read-only statestore
>> would need to wait for the store to be restored. I can't find a way to make
>> that possible since processors can't wait for the statestore to be restored.
>>
>> This is built into the runtime already. Nothing to worry about. It's
>> part of the regular restore logic -- as long as any store is restoring,
>> all processing is blocked.
>>
>>> Also, since the statestore would have logging disabled, it means there
>> is no initial restoration going on.
>>
>> No. When we hookup the input topic as changelog (as the DSL does) we
>> restore from the input topic during regular restore phase. The restore
>> logic does not even know it's reading from the input topic, but not from
>> a "*-change

Re: [DISCUSS] KIP-813 Shared State Stores

2022-02-08 Thread Daan Gertis
I just updated the KIP to reflect the things discussed in this thread.

As for your questions Guozhang:

> 1) How do we handle if the num.partitions of app A's store changelog is
> different from the num.tasks of app B's sub-topology with that read-only
> store? Or are we going to let each task of B keep a whole copy of the store
> of A by reading all of its changelog partitions, like global stores?

Good question. Both need to be co-partitioned to have the data available. 
Another option would be to use IQ to make the request, but that seems far from 
ideal.

> 2) Are we trying to synchronize the store updates from the changelog to app
> B's processing timelines, or just like what we do for global stores that we
> just update the read-only stores async?

Pretty much the same as we do for global stores.

> 3) If the answer to both of the above questions are the latter, then what's
> the main difference of adding a read-only store v.s. adding a global store?

I think because of the first answer the behavior differs from global stores.

Makes sense?

Cheers,

D.

From: Matthias J. Sax 
Date: Thursday, 20 January 2022 at 21:12
To: dev@kafka.apache.org 
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
> Any processor that would use that materialized, read-only statestore would 
> need to wait for the store to be restored. I can't find a way to make that 
> possible since processors can't wait for the statestore to be restored.

This is built into the runtime already. Nothing to worry about. It's
part of the regular restore logic -- as long as any store is restoring,
all processing is blocked.

> Also, since the statestore would have logging disabled, it means there is no 
> initial restoration going on.

No. When we hookup the input topic as changelog (as the DSL does) we
restore from the input topic during regular restore phase. The restore
logic does not even know it's reading from the input topic, but not from
a "*-changelog" topic).

Disabling changelogging does only affect the write path (ie,
`store.put()`) but not the restore path due to the internal "hookup" of
the input topic inside the restore logic.

It's not easy to find/understand by reverse engineering I guess, but
it's there.

One pointer where the actual hookup happens (might help to dig into it
more if you want):
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L353-L356


-Matthias


On 1/20/22 10:04 AM, Guozhang Wang wrote:
> Hello Daan,
>
> Thanks for writing the KIP. I just read through it and just my 2c here: to
> me it seems that one of the goal would be to "externalize" the internal
> changelog topic of an application (say A) so that other consumers can
> directly read them --- though technically without any auth, anyone knowing
> the topic name would be able to write to it too, conceptually we would just
> assume that app A is the only writer of that topic --- The question I had
> is how much we want to externalize the topic. For example we can,
> orthogonally to this KIP, just allow users to pass in a customized topic
> name when constructing a state store, indicating the application A to use
> that as the changelog, and since that topic is created outside of A and is
> publicly visible to anyone else on that cluster, anyone --- including any
> consumers, or streams apps. This is probably most flexible as for sharing,
> but we are even less assured that if application A is the only writer to
> that external topic unless we have explicit auth for A on that topic.
>
> Aside of that, here are a few more detailed comments about the
> implementation design itself following your current proposal:
>
> 1) How do we handle if the num.partitions of app A's store changelog is
> different from the num.tasks of app B's sub-topology with that read-only
> store? Or are we going to let each task of B keep a whole copy of the store
> of A by reading all of its changelog partitions, like global stores?
> 2) Are we trying to synchronize the store updates from the changelog to app
> B's processing timelines, or just like what we do for global stores that we
> just update the read-only stores async?
> 3) If the answer to both of the above questions are the latter, then what's
> the main difference of adding a read-only store v.s. adding a global store?
>
> Guozhang
>
>
>
> On Thu, Jan 20, 2022 at 6:27 AM Daan Gertis 
> wrote:
>
>> Hey Matthias,
>>
>> Thank you for that feedback, certainly some things to think about. Let me
>> add my thoughts:
>>
>> +1 on simplifying the motivation. Was aiming to add more context but I
>> think you're right, bringing it back to the essence makes more sense.
>>
>> I also follow the reasoning of not

RE: [DISCUSS] KIP-813 Shared State Stores

2022-01-20 Thread Daan Gertis
pproach" would have the disadvantage that input data is first 
deserialized, fed into the Processor, and than serialized again when put 
into the state store. Re-using the state restore code is a good idea 
from a performance point of view, but it might require quite some 
internal changes (your proposal to "not stop restoring" might not work 
as it could trigger quite some undesired side-effects given the current 
architecture of Kafka Streams).


-Matthias




On 1/16/22 11:52 PM, Daan Gertis wrote:
> Hey everyone,
> 
> Just created a KIP on sharing statestore state across multiple applications 
> without duplicating the data on multiple changelog topics. Have a look and 
> tell me what you think or what to improve. This is my first one, so please be 
> gentle 
> 
> https://cwiki.apache.org/confluence/x/q53kCw
> 
> Cheers!
> D.


Re: [DISCUSS] KIP-813: Replace KafkaConsumer with AdminClient in GetOffsetShell

2022-01-17 Thread Daan Gertis
Hey Deng,

Seems like we ran into a bit of a split-brain situation here. I just created 
KIP-813 based on the last number in the wiki page and only now see your 
proposal. Should have checked here as well, sorry.

Would you be able/willing to move to KIP-814 and register it on the KIP page as 
well? I just finished linking it through the wiki so would be a bit work to 
change it (but doable).

Tell me what you think!

Cheers,
D.

From: deng ziming 
Date: Friday, 14 January 2022 at 14:42
To: dev@kafka.apache.org 
Subject: [DISCUSS] KIP-813: Replace KafkaConsumer with AdminClient in 
GetOffsetShell
Hi everyone,

I would like to start a discussion for KIP-813
https://cwiki.apache.org/confluence/display/KAFKA/KIP-813%3A+Replace+KafkaConsumer+with+AdminClient+in+GetOffsetShell
 


The direct intention of this is to support max timestamp in GetOffsetShell, 
This seems like a simple change but there are some things to consider since it 
will change the --command-config parameter

Let me know what you think.


Thanks,
Ziming Deng


[DISCUSS] KIP-813 Shared State Stores

2022-01-16 Thread Daan Gertis
Hey everyone,

Just created a KIP on sharing statestore state across multiple applications 
without duplicating the data on multiple changelog topics. Have a look and tell 
me what you think or what to improve. This is my first one, so please be gentle 


https://cwiki.apache.org/confluence/x/q53kCw

Cheers!
D.


Permission to contribute

2022-01-14 Thread Daan Gertis
Hey everyone,

I would like to contribute a KIP that I think makes sense when building 
event-driven architectures at scale. I have been working with kafka for the 
past 6 years or so (I know, a shame I wasn’t on here sooner) and currently with 
a startup building a data-platform at peta-scale. We keep all our data within 
kafka (using tiered storage) and are running into some very specific issues 
when working at this scale.

Instead of waiting for others to contribute changes, I would like to get 
involved to not only formulate KIP’s, but also helping out on implementing 
those.

I have talked to Matthias J Sax prior to this and he was the one actually 
suggesting me to create a KIP for what I’m trying to accomplish. Hence my 
request.

My Wiki ID: calmera
My Jira ID: calmera

Cheers!
D.