Re: Thread Safety of Punctuator Functions and Processor Contexts

2022-11-08 Thread Joshua Suskalo
On Mon, Nov 7, 2022 at 8:05 PM Matthias J. Sax  wrote:

> However, I don't think that there is any guarantee that you might "see"
> concurrent modification (IIRC, RocksDB uses snapshot isolation for
> iterators). But maybe that's good enough for you?

Yes, thankfully that is exactly what I'm after, I'm only looking for eventual
consistency here, and as such am perfectly alright for new entries to be
added and then not processed in an ongoing run of the punctuator code,
only to be picked up the next time around.

Thanks so much for the help!

Joshua


Re: Thread Safety of Punctuator Functions and Processor Contexts

2022-11-07 Thread Matthias J. Sax
Good point about the docs... I guess it must support concurrency do to 
IQ, which might iterator over the store while `process()` modifies it. I 
was only reasoning about `process()` and punctuation and forgot IQ.


So it seems we indeed have this contract -- what is good new for you.

However, I don't think that there is any guarantee that you might "see" 
concurrent modification (IIRC, RocksDB uses snapshot isolation for 
iterators). But maybe that's good enough for you?



-Matthias


On 11/7/22 11:13 AM, Joshua Suskalo wrote:

"Matthias J. Sax"  writes:


In general, it's not safe to keep the iterator open, because when process() is
executed in-between two punctuator calls, it might modify the store and
invalidate the iterator. There is no guarantee that the returned iterator
supports concurrency.


This makes sense but unfortunately adds significant challenge to making these
operations happen concurrently, which is effectively a hard requirement for my
usecase due to the scale of the state stores' contained data.


Hence, even if it happens that the currently used iterator is concurrent, there
is no API contract about it.


This surprises me though, because it seems to contratict what's stated in the
ReadOnlyKeyValueStore documentation[1].


The returned iterator must be safe from ConcurrentModificationExceptions and
must not return null values. Order is not guaranteed as bytes lexicographical
ordering might not represent key order.


Maybe I'm reading this wrong, but it seems to imply that the returned iterator
must be safe in the face of concurrent modifications, which is required if you
are permitted to make a read-modify-write cycle while using the iterator, which
the existing version of my application has been doing correctly so far as I can
tell.

Am I vastly misunderstanding the intended usecase for punctuators and need to
determine a different mechanism for performing periodic operations on a data
store?

[1]: 



Re: Thread Safety of Punctuator Functions and Processor Contexts

2022-11-07 Thread Joshua Suskalo
"Matthias J. Sax"  writes:

> In general, it's not safe to keep the iterator open, because when process() is
> executed in-between two punctuator calls, it might modify the store and
> invalidate the iterator. There is no guarantee that the returned iterator
> supports concurrency.

This makes sense but unfortunately adds significant challenge to making these
operations happen concurrently, which is effectively a hard requirement for my
usecase due to the scale of the state stores' contained data.

> Hence, even if it happens that the currently used iterator is concurrent, 
> there
> is no API contract about it.

This surprises me though, because it seems to contratict what's stated in the
ReadOnlyKeyValueStore documentation[1].

> The returned iterator must be safe from ConcurrentModificationExceptions and
> must not return null values. Order is not guaranteed as bytes lexicographical
> ordering might not represent key order.

Maybe I'm reading this wrong, but it seems to imply that the returned iterator
must be safe in the face of concurrent modifications, which is required if you
are permitted to make a read-modify-write cycle while using the iterator, which
the existing version of my application has been doing correctly so far as I can
tell.

Am I vastly misunderstanding the intended usecase for punctuators and need to
determine a different mechanism for performing periodic operations on a data
store?

[1]: 



Re: Thread Safety of Punctuator Functions and Processor Contexts

2022-11-07 Thread Matthias J. Sax
In general, it's not safe to keep the iterator open, because when 
process() is executed in-between two punctuator calls, it might modify 
the store and invalidate the iterator. There is no guarantee that the 
returned iterator supports concurrency.


Hence, even if it happens that the currently used iterator is 
concurrent, there is no API contract about it.


-Matthias

On 11/7/22 7:41 AM, Joshua Suskalo wrote:

Hello Matthias, thanks for the response!


"Matthias J. Sax"  writes:


Spanning your own thread and calling context.forward() is _not_ safe, and there
is currently no way for you to make is safe. The runtime code makes certain
assumptions about being single threaded which would break if you call
context.forward() from a different thread. (The runtime _always_ assume that
context.forward() is called inside process() or inside the punctuation
callback.)


This is how I expected, so I'm not too concerned here.


The only way forward I can see, would be trying to make the punctuation call
shorter, eg, not scanning the full store but only a small part of it, such that
the thread can go back to execute process() quicker (it's of course an
additional challenge to keep track where you stopped the scan and to resume
it...), and to make the punctuation interval shorter.


This is where I have another question about safety. Is it safe to use a
KVStoreIterator that was retrieved during a punctuator after that punctuator
call has exited, perhaps using it to store offset information across multiple
runs? This seems to me to be the most obvious way to handle this.

A related question if that one has an affirmative answer would be asking if the
KVStoreIterator can be safely sent to another thread for use (akin to Rust's
Send trait, in that I do not wish to have concurrent access, only to use it from
another thread), as if that were possible I could use the j.u.c.ConcurrentQueue
to allow a thread created on the punctuator to compute the messages that need to
be sent and enqueue them, and then in further punctuator runs I could then send
messages which have been computed.


Hope this helps.


What you've said already is quite helpful and I will begin pursuing methods
based on what you've said, but I also hope that someone can answer my further
questions since they will be helpful to my implementation as well.

Thanks so much,
Joshua


Re: Thread Safety of Punctuator Functions and Processor Contexts

2022-11-07 Thread Joshua Suskalo
Hello Matthias, thanks for the response!


"Matthias J. Sax"  writes:

> Spanning your own thread and calling context.forward() is _not_ safe, and 
> there
> is currently no way for you to make is safe. The runtime code makes certain
> assumptions about being single threaded which would break if you call
> context.forward() from a different thread. (The runtime _always_ assume that
> context.forward() is called inside process() or inside the punctuation
> callback.)

This is how I expected, so I'm not too concerned here.

> The only way forward I can see, would be trying to make the punctuation call
> shorter, eg, not scanning the full store but only a small part of it, such 
> that
> the thread can go back to execute process() quicker (it's of course an
> additional challenge to keep track where you stopped the scan and to resume
> it...), and to make the punctuation interval shorter.

This is where I have another question about safety. Is it safe to use a
KVStoreIterator that was retrieved during a punctuator after that punctuator
call has exited, perhaps using it to store offset information across multiple
runs? This seems to me to be the most obvious way to handle this.

A related question if that one has an affirmative answer would be asking if the
KVStoreIterator can be safely sent to another thread for use (akin to Rust's
Send trait, in that I do not wish to have concurrent access, only to use it from
another thread), as if that were possible I could use the j.u.c.ConcurrentQueue
to allow a thread created on the punctuator to compute the messages that need to
be sent and enqueue them, and then in further punctuator runs I could then send
messages which have been computed.

> Hope this helps.

What you've said already is quite helpful and I will begin pursuing methods
based on what you've said, but I also hope that someone can answer my further
questions since they will be helpful to my implementation as well.

Thanks so much,
Joshua


Re: Thread Safety of Punctuator Functions and Processor Contexts

2022-11-04 Thread Matthias J. Sax
Your observation is correct. The Processor#process() and punctuation 
callback are executed on a single thread. It's by design to avoid the 
issue of concurrency (writing thread safe code is hard and we want to 
avoid putting this burden onto the user). There is currently no plans to 
make process() and punctuation concurrent, and it would require a larger 
change inside the runtime code.


Spanning your own thread and calling context.forward() is _not_ safe, 
and there is currently no way for you to make is safe. The runtime code 
makes certain assumptions about being single threaded which would break 
if you call context.forward() from a different thread. (The runtime 
_always_ assume that context.forward() is called inside process() or 
inside the punctuation callback.)


The only way forward I can see, would be trying to make the punctuation 
call shorter, eg, not scanning the full store but only a small part of 
it, such that the thread can go back to execute process() quicker (it's 
of course an additional challenge to keep track where you stopped the 
scan and to resume it...), and to make the punctuation interval shorter.


Hope this helps.

-Matthias

On 11/3/22 11:30 AM, Joshua Suskalo wrote:

I have data that I am storing in a state store which I would like to
periodically run some code over, and the way I have decided to do this is
via a punctuator from inside a Transformer, which gets an iterator over the
state store, performs actions, and forwards events on.

So far, everything works fine, but I do have one issue: messages coming
into the transformer are intended to act as updates to individual values
stored in the state store, and should be incorporated as immediately as
possible, but whenever the punctuator is running the stream thread is tied
up with traversing the iterator and cannot process new messages.

I have written the code in such a way that the transformation function for
these entities is thread safe with respect to the code in the punctuator,
such that the punctuator could fire from one thread while another thread
processes new events without issue, however this was done under a mistaken
understanding of how stream threads are allocated, as I had believed that
punctuators were fired from a different thread as compared to the transform
method.

Inside the punctuator I use the ProcessorContext to forward additional
messages on, as well as in the transform method, and I would like to know
about the thread safety of having those two things happening concurrently
from separate threads, as might occur if I were to have the punctuator
start a thread to perform the iteration, rather than doing the iteration
itself. Is this a safe thing to do, or is this going to be prone to bugs,
even assuming that I have written code to ensure that no individual key in
the state store will be manipulated concurrently from both threads at once?

I've looked carefully through the documentation and looked for others doing
similar things online and have come up empty handed, and while I am happy
to look through the code of kafka streams to find out for myself, that will
take some time as I'm mostly unfamiliar with the codebase, and I was hoping
that I might be able to get an answer more quickly here.

Joshua



Thread Safety of Punctuator Functions and Processor Contexts

2022-11-03 Thread Joshua Suskalo
I have data that I am storing in a state store which I would like to
periodically run some code over, and the way I have decided to do this is
via a punctuator from inside a Transformer, which gets an iterator over the
state store, performs actions, and forwards events on.

So far, everything works fine, but I do have one issue: messages coming
into the transformer are intended to act as updates to individual values
stored in the state store, and should be incorporated as immediately as
possible, but whenever the punctuator is running the stream thread is tied
up with traversing the iterator and cannot process new messages.

I have written the code in such a way that the transformation function for
these entities is thread safe with respect to the code in the punctuator,
such that the punctuator could fire from one thread while another thread
processes new events without issue, however this was done under a mistaken
understanding of how stream threads are allocated, as I had believed that
punctuators were fired from a different thread as compared to the transform
method.

Inside the punctuator I use the ProcessorContext to forward additional
messages on, as well as in the transform method, and I would like to know
about the thread safety of having those two things happening concurrently
from separate threads, as might occur if I were to have the punctuator
start a thread to perform the iteration, rather than doing the iteration
itself. Is this a safe thing to do, or is this going to be prone to bugs,
even assuming that I have written code to ensure that no individual key in
the state store will be manipulated concurrently from both threads at once?

I've looked carefully through the documentation and looked for others doing
similar things online and have come up empty handed, and while I am happy
to look through the code of kafka streams to find out for myself, that will
take some time as I'm mostly unfamiliar with the codebase, and I was hoping
that I might be able to get an answer more quickly here.

Joshua