[ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17745690#comment-17745690
 ] 

Matthias J. Sax edited comment on KAFKA-15116 at 7/21/23 6:18 PM:
------------------------------------------------------------------

{quote}The internal store is shared across stream threads.
{quote}
That is not how Kafka Streams works. If you have a store, the store is sharded 
and each StreamsThreads has it's own shard. A single key goes into a single 
shard (ie, must go into a single shard – otherwise you break the system) by 
partitioning the data by key.
{quote}There is a consumer outside of kafka streams that is reading 
"read_committed" messages that populates the store and unblocks the processor.
{quote}
Are you saying you are reading the corresponding changelog topic? That is not 
recommended in general, as it's considered an implementation detail. It's still 
not clear why anything would be "blocked" or how this external consumer would 
do the unblocking (blocking to me really mean "to wait / block the thread").
{quote}In this context I'm talking about eos and the transaction being 
committed and therefore the consumer being able to read the "read_committed" 
message.
{quote}
Well yes, if Kafka Streams commit, app pending transactions are committed. So 
if you are saying, you want to accumulate 3 message for a key, but so far only 
2 message got processed, 2 messages would be written into the state store and 
changelog topic on commit. But that is by design and correct. As said above, 
you should not read from the changelog topic. The right thing to do would be, 
to change your processor and let it write into an output topic if all 3 
messages are there (and never write a partial result into this topic), and read 
from this output topic instead of the changelog (in case I did understand the 
scenario you describe correctly).
{quote}I think ultimately our problem is that the stream thread carries on 
processing messages during a rebalance but does not complete them (transaction 
commit)
{quote}
I think you make incorrect assumption how processing works (and what a 
transaction in Kafka is). A transaction is really just to guard against 
failures – it has no _semantic_ meaning in Kafka that would align to your 
business logic (there is no "begin TX" or "commit TX" calls exposed in Kafka 
Streams that you could use to align TX to your business logic – and you don't 
have too).
{quote}Even though pausing processing during a rebalance probably shouldn't be 
default behaviour it would be ideal for us if it were configurable.
{quote}
This was the old "eager rebalancing" and it was changed because there is 
actually no reason to "stop the world" during a rebalance. Also I am not sure 
how it would help your case? Even we stop processing during a rebalance, we 
would need to commit the open TX when rebalancing starts. So nothing really 
changes.
{quote}Pausing consumption feels valid especially when there is a dependency 
between messages with the same partion key?
{quote}
How should the system know if there is a dependency? It seems you are not 
writing your app in the proper way and may incorrect assumptions how Kafka is 
designed?


was (Author: mjsax):
{quote}The internal store is shared across stream threads.
{quote}
That is not how Kafka Streams works. If you have a store, the store is sharded 
and each StreamsThreads has it's own shard.
{quote}There is a consumer outside of kafka streams that is reading 
"read_committed" messages that populates the store and unblocks the processor.
{quote}
Are you saying you are reading the corresponding changelog topic? That is not 
recommended in general, as it's considered an implementation detail. It's still 
not clear why anything would be "blocked" or how this external consumer would 
do the unblocking (blocking to me really mean "to wait / block the thread").
{quote}In this context I'm talking about eos and the transaction being 
committed and therefore the consumer being able to read the "read_committed" 
message.
{quote}
Well yes, if Kafka Streams commit, app pending transactions are committed. So 
if you are saying, you want to accumulate 3 message for a key, but so far only 
2 message got processed, 2 messages would be written into the state store and 
changelog topic on commit. But that is by design and correct. As said above, 
you should not read from the changelog topic. The right thing to do would be, 
to change your processor and let it write into an output topic if all 3 
messages are there (and never write a partial result into this topic), and read 
from this output topic instead of the changelog (in case I did understand the 
scenario you describe correctly).
{quote}I think ultimately our problem is that the stream thread carries on 
processing messages during a rebalance but does not complete them (transaction 
commit)
{quote}
I think you make incorrect assumption how processing works (and what a 
transaction in Kafka is). A transaction is really just to guard against 
failures – it has no _semantic_ meaning in Kafka that would align to your 
business logic (there is no "begin TX" or "commit TX" calls exposed in Kafka 
Streams that you could use to align TX to your business logic – and you don't 
have too).
{quote}Even though pausing processing during a rebalance probably shouldn't be 
default behaviour it would be ideal for us if it were configurable.
{quote}
This was the old "eager rebalancing" and it was changed because there is 
actually no reason to "stop the world" during a rebalance. Also I am not sure 
how it would help your case? Even we stop processing during a rebalance, we 
would need to commit the open TX when rebalancing starts. So nothing really 
changes.
{quote}Pausing consumption feels valid especially when there is a dependency 
between messages with the same partion key?
{quote}
How should the system know if there is a dependency? It seems you are not 
writing your app in the proper way and may incorrect assumptions how Kafka is 
designed?

> Kafka Streams processing blocked during rebalance
> -------------------------------------------------
>
>                 Key: KAFKA-15116
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15116
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.5.0
>            Reporter: David Gammon
>            Priority: Major
>
> We have a Kafka Streams application that simply takes a messages, processes 
> it and then produces an event out the other side. The complexity is that 
> there is a requirement that all events with the same partition key must be 
> committed before the next message  is processed.
> This works most of the time flawlessly but we have started to see problems 
> during deployments where the first message blocks the second message during a 
> rebalance because the first message isn’t committed before the second message 
> is processed. This ultimately results in transactions timing out and more 
> rebalancing.
> We’ve tried lots of configuration to get the behaviour we require with no 
> luck. We’ve now put in a temporary fix so that Kafka Streams works with our 
> framework but it feels like this might be a missing feature or potentially a 
> bug.
> +Example+
> Given:
>  * We have two messages (InA and InB).
>  * Both messages have the same partition key.
>  * A rebalance is in progress so streams is no longer able to commit.
> When:
>  # Message InA -> processor -> OutA (not committed)
>  # Message InB -> processor -> blocked because #1 has not been committed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to