Thanks for updating the KIP.

Could you add more details about the signature of `addReadOnlyStateStore()` -- What parameters does it take? Are there any overloads taking different parameters? The KIP only contains some verbal description on the "Implementation Plan" section, that is hard to find and hard to read.

The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`?

About timestamp synchronization: why do you propose to disable timestamp synchronization (similar to global state stores)? It seems to be an unnecessary limitation? -- Given that we could re-use the new method for source `KTables` (ie, `StreamsBuilder#table()` implemenation), having timestamp synchronization enabled seems to be important?


-Matthias


On 2/8/22 11:01 PM, Guozhang Wang wrote:
Daan,

Thanks for the replies, those make sense to me.

On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis <dger...@korfinancial.com> wrote:

I just updated the KIP to reflect the things discussed in this thread.

As for your questions Guozhang:

1) How do we handle if the num.partitions of app A's store changelog is
different from the num.tasks of app B's sub-topology with that read-only
store? Or are we going to let each task of B keep a whole copy of the
store
of A by reading all of its changelog partitions, like global stores?

Good question. Both need to be co-partitioned to have the data available.
Another option would be to use IQ to make the request, but that seems far
from ideal.

2) Are we trying to synchronize the store updates from the changelog to
app
B's processing timelines, or just like what we do for global stores that
we
just update the read-only stores async?

Pretty much the same as we do for global stores.

3) If the answer to both of the above questions are the latter, then
what's
the main difference of adding a read-only store v.s. adding a global
store?

I think because of the first answer the behavior differs from global
stores.

Makes sense?

Cheers,

D.

From: Matthias J. Sax <mj...@apache.org>
Date: Thursday, 20 January 2022 at 21:12
To: dev@kafka.apache.org <dev@kafka.apache.org>
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
Any processor that would use that materialized, read-only statestore
would need to wait for the store to be restored. I can't find a way to make
that possible since processors can't wait for the statestore to be restored.

This is built into the runtime already. Nothing to worry about. It's
part of the regular restore logic -- as long as any store is restoring,
all processing is blocked.

Also, since the statestore would have logging disabled, it means there
is no initial restoration going on.

No. When we hookup the input topic as changelog (as the DSL does) we
restore from the input topic during regular restore phase. The restore
logic does not even know it's reading from the input topic, but not from
a "*-changelog" topic).

Disabling changelogging does only affect the write path (ie,
`store.put()`) but not the restore path due to the internal "hookup" of
the input topic inside the restore logic.

It's not easy to find/understand by reverse engineering I guess, but
it's there.

One pointer where the actual hookup happens (might help to dig into it
more if you want):

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L353-L356


-Matthias


On 1/20/22 10:04 AM, Guozhang Wang wrote:
Hello Daan,

Thanks for writing the KIP. I just read through it and just my 2c here:
to
me it seems that one of the goal would be to "externalize" the internal
changelog topic of an application (say A) so that other consumers can
directly read them --- though technically without any auth, anyone
knowing
the topic name would be able to write to it too, conceptually we would
just
assume that app A is the only writer of that topic --- The question I had
is how much we want to externalize the topic. For example we can,
orthogonally to this KIP, just allow users to pass in a customized topic
name when constructing a state store, indicating the application A to use
that as the changelog, and since that topic is created outside of A and
is
publicly visible to anyone else on that cluster, anyone --- including any
consumers, or streams apps. This is probably most flexible as for
sharing,
but we are even less assured that if application A is the only writer to
that external topic unless we have explicit auth for A on that topic.

Aside of that, here are a few more detailed comments about the
implementation design itself following your current proposal:

1) How do we handle if the num.partitions of app A's store changelog is
different from the num.tasks of app B's sub-topology with that read-only
store? Or are we going to let each task of B keep a whole copy of the
store
of A by reading all of its changelog partitions, like global stores?
2) Are we trying to synchronize the store updates from the changelog to
app
B's processing timelines, or just like what we do for global stores that
we
just update the read-only stores async?
3) If the answer to both of the above questions are the latter, then
what's
the main difference of adding a read-only store v.s. adding a global
store?

Guozhang



On Thu, Jan 20, 2022 at 6:27 AM Daan Gertis <dger...@korfinancial.com>
wrote:

Hey Matthias,

Thank you for that feedback, certainly some things to think about. Let
me
add my thoughts:

+1 on simplifying the motivation. Was aiming to add more context but I
think you're right, bringing it back to the essence makes more sense.

I also follow the reasoning of not having leader and follower. Makes
sense
to view it from a single app point of view.

As for the API method and its parameters, I wanted to stay close to the
API for adding a regular statestore, but I can perfectly find myself in
defining an addReadOnlyStateStore() method instead.

I agree the processor approach would be the most flexible one, and
surely
it allows you to use a processor to base the statestore off an existing
topic. From what I understood from the codebase, there might be a
problem
when using that statestore. Any processor that would use that
materialized,
read-only statestore would need to wait for the store to be restored. I
can't find a way to make that possible since processors can't wait for
the
statestore to be restored. Also, since the statestore would have logging
disabled, it means there is no initial restoration going on. As you
wrote,
the DSL is already doing this, so I'm pretty sure I'm missing something,
just unable to find what exactly.

I will rewrite the parts in the KIP to make processor-based the
preferred
choice, along with the changes to the motivation etc. Only thing to
figure
out is that restoring behavior to be sure processors of the readonly
statestore aren't working with stale data.

D.

-----Original Message-----
From: Matthias J. Sax <mj...@apache.org>
Sent: 19 January 2022 21:31
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-813 Shared State Stores

Daan,

thanks for the KIP. I personally find the motivation section a little
bit
confusing. If I understand the KIP correctly, you want to read a topic
into
a state store (ie, materialize it). This is already possible today.

Of course, today a "second" changelog topic would be created. It seems
the
KIP aims to avoid the additional changelog topic, and to allow to re-use
the original input topic (this optimization is already available for the
DSL, but not for the PAPI).

If my observation is correct, we can simplify the motivation accordingly
(the fact that you want to use this feature to share state across
different
applications more efficiently seems to be secondary and we could omit it
IMHO to keep the motivation focused).

As a result, we also don't need to concept of "leader" and "follower".
In the end, Kafka Streams cannot reason/enforce any usage patterns
across
different apps, but we can only guarantee stuff within a single
application
(ie, don't create a changelog but reuse an input topic as changelog). It
would simplify the KIP if we remove these parts.



For the API, I am wondering why you propose to pass in `processorNames`?
To me, it seems more reasonable to pass a `ProcessorSupplier` instead
(similar to what we do for `addGlobalStore`)? The provided `Processor`
must
implement a certain pattern, ie, take each input record an apply it
unmodified to the state store (ie, the Processor will be solely
responsible
to maintain the state store). We might also need to pass in other
argument
similar to `addGlobalStore` into this method). (More below.)

If other processors need to read the state store, they can be connected
to
it explicitly via `connectProcessorAndStateStores()`? I guess a hybrid
approach to keep `processorName` would also be possible, but IMHO all
those
should only _read_ the state store (but not modify it), to keep a clear
conceptual separation.

About the method name: wondering if we should use a different name to be
more explicit what the method does? Maybe `addReadOnlyStateStore`?



Btw: please omit any code snippets and only put the newly added method
signature in the KIP.

What I don't yet understand is the section "Allow state stores to
continue listening for changes from their changelog". Can you elaborate?

About:

Since a changelog topic is created with the application id in it’s
name,
it would allow us to check in the follower if the changelog topic starts
with our application id. If it doesn’t, we are not allowed to send a
log.

The DSL implements this differently, and just disabled the changelog for
the state store (ie, for the "follower"). We could do the same thing
(either enforcing that the provided `StoreBuilder` has changelogging
disabled, or by just ignoring it and disabled it hard coded).


Ie, overall I would prefer the "source-procssor appraoch" that you put
into rejected alternatives. Note that the problem you call out, namely

Problem with this approach is the lack of having restoring support
within the state store

does not apply. A restore it absolutely possible and the DSL already
supports it.


Or is your concern with regard to performance? The "source-processor
approach" would have the disadvantage that input data is first
deserialized, fed into the Processor, and than serialized again when put
into the state store. Re-using the state restore code is a good idea
from a performance point of view, but it might require quite some
internal changes (your proposal to "not stop restoring" might not work
as it could trigger quite some undesired side-effects given the current
architecture of Kafka Streams).


-Matthias




On 1/16/22 11:52 PM, Daan Gertis wrote:
Hey everyone,

Just created a KIP on sharing statestore state across multiple
applications without duplicating the data on multiple changelog topics.
Have a look and tell me what you think or what to improve. This is my
first
one, so please be gentle 😉

https://cwiki.apache.org/confluence/x/q53kCw

Cheers!
D.






Reply via email to