[ https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17745690#comment-17745690 ]
Matthias J. Sax edited comment on KAFKA-15116 at 7/21/23 6:18 PM: ------------------------------------------------------------------ {quote}The internal store is shared across stream threads. {quote} That is not how Kafka Streams works. If you have a store, the store is sharded and each StreamsThreads has it's own shard. A single key goes into a single shard (ie, must go into a single shard – otherwise you break the system) by partitioning the data by key. {quote}There is a consumer outside of kafka streams that is reading "read_committed" messages that populates the store and unblocks the processor. {quote} Are you saying you are reading the corresponding changelog topic? That is not recommended in general, as it's considered an implementation detail. It's still not clear why anything would be "blocked" or how this external consumer would do the unblocking (blocking to me really mean "to wait / block the thread"). {quote}In this context I'm talking about eos and the transaction being committed and therefore the consumer being able to read the "read_committed" message. {quote} Well yes, if Kafka Streams commit, app pending transactions are committed. So if you are saying, you want to accumulate 3 message for a key, but so far only 2 message got processed, 2 messages would be written into the state store and changelog topic on commit. But that is by design and correct. As said above, you should not read from the changelog topic. The right thing to do would be, to change your processor and let it write into an output topic if all 3 messages are there (and never write a partial result into this topic), and read from this output topic instead of the changelog (in case I did understand the scenario you describe correctly). {quote}I think ultimately our problem is that the stream thread carries on processing messages during a rebalance but does not complete them (transaction commit) {quote} I think you make incorrect assumption how processing works (and what a transaction in Kafka is). A transaction is really just to guard against failures – it has no _semantic_ meaning in Kafka that would align to your business logic (there is no "begin TX" or "commit TX" calls exposed in Kafka Streams that you could use to align TX to your business logic – and you don't have too). {quote}Even though pausing processing during a rebalance probably shouldn't be default behaviour it would be ideal for us if it were configurable. {quote} This was the old "eager rebalancing" and it was changed because there is actually no reason to "stop the world" during a rebalance. Also I am not sure how it would help your case? Even we stop processing during a rebalance, we would need to commit the open TX when rebalancing starts. So nothing really changes. {quote}Pausing consumption feels valid especially when there is a dependency between messages with the same partion key? {quote} How should the system know if there is a dependency? It seems you are not writing your app in the proper way and may incorrect assumptions how Kafka is designed? was (Author: mjsax): {quote}The internal store is shared across stream threads. {quote} That is not how Kafka Streams works. If you have a store, the store is sharded and each StreamsThreads has it's own shard. {quote}There is a consumer outside of kafka streams that is reading "read_committed" messages that populates the store and unblocks the processor. {quote} Are you saying you are reading the corresponding changelog topic? That is not recommended in general, as it's considered an implementation detail. It's still not clear why anything would be "blocked" or how this external consumer would do the unblocking (blocking to me really mean "to wait / block the thread"). {quote}In this context I'm talking about eos and the transaction being committed and therefore the consumer being able to read the "read_committed" message. {quote} Well yes, if Kafka Streams commit, app pending transactions are committed. So if you are saying, you want to accumulate 3 message for a key, but so far only 2 message got processed, 2 messages would be written into the state store and changelog topic on commit. But that is by design and correct. As said above, you should not read from the changelog topic. The right thing to do would be, to change your processor and let it write into an output topic if all 3 messages are there (and never write a partial result into this topic), and read from this output topic instead of the changelog (in case I did understand the scenario you describe correctly). {quote}I think ultimately our problem is that the stream thread carries on processing messages during a rebalance but does not complete them (transaction commit) {quote} I think you make incorrect assumption how processing works (and what a transaction in Kafka is). A transaction is really just to guard against failures – it has no _semantic_ meaning in Kafka that would align to your business logic (there is no "begin TX" or "commit TX" calls exposed in Kafka Streams that you could use to align TX to your business logic – and you don't have too). {quote}Even though pausing processing during a rebalance probably shouldn't be default behaviour it would be ideal for us if it were configurable. {quote} This was the old "eager rebalancing" and it was changed because there is actually no reason to "stop the world" during a rebalance. Also I am not sure how it would help your case? Even we stop processing during a rebalance, we would need to commit the open TX when rebalancing starts. So nothing really changes. {quote}Pausing consumption feels valid especially when there is a dependency between messages with the same partion key? {quote} How should the system know if there is a dependency? It seems you are not writing your app in the proper way and may incorrect assumptions how Kafka is designed? > Kafka Streams processing blocked during rebalance > ------------------------------------------------- > > Key: KAFKA-15116 > URL: https://issues.apache.org/jira/browse/KAFKA-15116 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.5.0 > Reporter: David Gammon > Priority: Major > > We have a Kafka Streams application that simply takes a messages, processes > it and then produces an event out the other side. The complexity is that > there is a requirement that all events with the same partition key must be > committed before the next message is processed. > This works most of the time flawlessly but we have started to see problems > during deployments where the first message blocks the second message during a > rebalance because the first message isn’t committed before the second message > is processed. This ultimately results in transactions timing out and more > rebalancing. > We’ve tried lots of configuration to get the behaviour we require with no > luck. We’ve now put in a temporary fix so that Kafka Streams works with our > framework but it feels like this might be a missing feature or potentially a > bug. > +Example+ > Given: > * We have two messages (InA and InB). > * Both messages have the same partition key. > * A rebalance is in progress so streams is no longer able to commit. > When: > # Message InA -> processor -> OutA (not committed) > # Message InB -> processor -> blocked because #1 has not been committed -- This message was sent by Atlassian Jira (v8.20.10#820010)