[ https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750506#comment-17750506 ]
Matthias J. Sax commented on KAFKA-15116: ----------------------------------------- Are you saying you are using a custom store? For this case, it's your responsibility to make sure it works with Kafka Streams. If you violate assumptions Kafka Streams makes (one of them is, that a task can access a store independently of all other tasks), all bets are off unfortunately. Thus, you would need to change your store that one task cannot block any other task to make progress. {quote}It is the behaviour we have observed and validated with a fix to the core streams code. {quote} Are you referring to {quote}The fix is to add a check for rebalancing in the while loop in runOnce. This checks if a rebalancing is in progress and sets the numIterations to 0 to stop processing of messages. When it has rebalanced it sets numIterations back to 1. {quote} from further above? It's not clear to me why this would help? In the end, when a rebalance starts, we might continue processing until we need to hand off a partition. For this case, we need to commit pending transactions first, and would start a new transaction for the remaining partitions we have afterwards. {quote}Committing the open transactions is fine (if possible). The problem is the un committed transactions due to the rebalancing. {quote} Not sure if I can follow. An open and uncommitted transaction is the same thing... When a rebalance starts, Kafka Streams would commit all pending transactions first, and thus there should not be any pending transactions. Or course, as said above, a new TX might get started right away for all partitions we did not need to hand off and processing would continue right away. {quote}If we have two un committed transactions for the same partition key we end up in the blocking state because the second message cannot be processed because the first message hasn't been committed. {quote} What do you mean by "two un-committed transaction for the same partition key" – if there are two messages with the same key, they should be in the same input topic partition (the only exception would be some stateless processing but in your case state is involved) what ensures that a single task (and thus a single thread) processes all record with the a key, and thus the is only one transaction for this key. If you use a custom state store and violate this assumption, and put two record into different partitions and they are potentially processed by two threads, and thus you create a deadlock on the state store when both thread try to access the same row for this key, it's an incorrect usage of Kafka Streams. {quote}The old behaviour sounds like it would solve our problem. Is there a configuration option to switch this back on? {quote} [~ableegoldman] might know if it's possible to switch of cooperative rebalancing, but again, it seem the issue is how you use Kafka Streams (maybe I am wrong) – you should never block in a Processor (and for your case maybe even end up in a deadlock until some timeout hits, if I understood what you are saying correctly). – Also, even if it's possible to disable cooperative rebalancing, the old behavior is effectively deprecated and eager rebalancing will be completely removed in a future release. {quote}To answer your question "How should the system know if there is a dependency?": Through configuration. I don't think anything that we are trying to do is going against how Kafka is designed. It might be non optimal and legacy but it does feel like something that streams should be flexible enough to handle. Why can't we chose to "stop the world"? {quote} That is conceptually possible – and even for cooperative rebalancing we could `pause()` all partitions and not process anything. But again, from what I think to understand so far, the issue is blocking in user-code, not how Kafka Streams works. > Kafka Streams processing blocked during rebalance > ------------------------------------------------- > > Key: KAFKA-15116 > URL: https://issues.apache.org/jira/browse/KAFKA-15116 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.5.0 > Reporter: David Gammon > Priority: Major > > We have a Kafka Streams application that simply takes a messages, processes > it and then produces an event out the other side. The complexity is that > there is a requirement that all events with the same partition key must be > committed before the next message is processed. > This works most of the time flawlessly but we have started to see problems > during deployments where the first message blocks the second message during a > rebalance because the first message isn’t committed before the second message > is processed. This ultimately results in transactions timing out and more > rebalancing. > We’ve tried lots of configuration to get the behaviour we require with no > luck. We’ve now put in a temporary fix so that Kafka Streams works with our > framework but it feels like this might be a missing feature or potentially a > bug. > +Example+ > Given: > * We have two messages (InA and InB). > * Both messages have the same partition key. > * A rebalance is in progress so streams is no longer able to commit. > When: > # Message InA -> processor -> OutA (not committed) > # Message InB -> processor -> blocked because #1 has not been committed -- This message was sent by Atlassian Jira (v8.20.10#820010)