[ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750506#comment-17750506
 ] 

Matthias J. Sax commented on KAFKA-15116:
-----------------------------------------

Are you saying you are using a custom store? For this case, it's your 
responsibility to make sure it works with Kafka Streams. If you violate 
assumptions Kafka Streams makes (one of them is, that a task can access a store 
independently of all other tasks), all bets are off unfortunately. Thus, you 
would need to change your store that one task cannot block any other task to 
make progress.
{quote}It is the behaviour we have observed and validated with a fix to the 
core streams code. 
{quote}
Are you referring to 
{quote}The fix is to add a check for rebalancing in the while loop in runOnce. 
This checks if a rebalancing is in progress and sets the numIterations to 0 to 
stop processing of messages. When it has rebalanced it sets numIterations back 
to 1.
{quote}
from further above? It's not clear to me why this would help? In the end, when 
a rebalance starts, we might continue processing until we need to hand off a 
partition. For this case, we need to commit pending transactions first, and 
would start a new transaction for the remaining partitions we have afterwards.
{quote}Committing the open transactions is fine (if possible). The problem is 
the un committed transactions due to the rebalancing.
{quote}
Not sure if I can follow. An open and uncommitted transaction is the same 
thing... When a rebalance starts, Kafka Streams would commit all pending 
transactions first, and thus there should not be any pending transactions. Or 
course, as said above, a new TX might get started right away for all partitions 
we did not need to hand off and processing would continue right away.
{quote}If we have two un committed transactions for the same partition key we 
end up in the blocking state because the second message cannot be processed 
because the first message hasn't been committed. 
{quote}
What do you mean by "two un-committed transaction for the same partition key" – 
if there are two messages with the same key, they should be in the same input 
topic partition (the only exception would be some stateless processing but in 
your case state is involved) what ensures that a single task (and thus a single 
thread) processes all record with the a key, and thus the is only one 
transaction for this key. If you use a custom state store and violate this 
assumption, and put two record into different partitions and they are 
potentially processed by two threads, and thus you create a deadlock on the 
state store when both thread try to access the same row for this key, it's an 
incorrect usage of Kafka Streams. 
{quote}The old behaviour sounds like it would solve our problem. Is there a 
configuration option to switch this back on?
{quote}
[~ableegoldman] might know if it's possible to switch of cooperative 
rebalancing, but again, it seem the issue is how you use Kafka Streams (maybe I 
am wrong) – you should never block in a Processor (and for your case maybe even 
end up in a deadlock until some timeout hits, if I understood what you are 
saying correctly). – Also, even if it's possible to disable cooperative 
rebalancing, the old behavior is effectively deprecated and eager rebalancing 
will be completely removed in a future release.
{quote}To answer your question "How should the system know if there is a 
dependency?": Through configuration. I don't think anything that we are trying 
to do is going against how Kafka is designed. It might be non optimal and 
legacy but it does feel like something that streams should be flexible enough 
to handle. Why can't we chose to "stop the world"?
{quote}
That is conceptually possible – and even for cooperative rebalancing we could 
`pause()` all partitions and not process anything. But again, from what I think 
to understand so far, the issue is blocking in user-code, not how Kafka Streams 
works.

> Kafka Streams processing blocked during rebalance
> -------------------------------------------------
>
>                 Key: KAFKA-15116
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15116
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.5.0
>            Reporter: David Gammon
>            Priority: Major
>
> We have a Kafka Streams application that simply takes a messages, processes 
> it and then produces an event out the other side. The complexity is that 
> there is a requirement that all events with the same partition key must be 
> committed before the next message  is processed.
> This works most of the time flawlessly but we have started to see problems 
> during deployments where the first message blocks the second message during a 
> rebalance because the first message isn’t committed before the second message 
> is processed. This ultimately results in transactions timing out and more 
> rebalancing.
> We’ve tried lots of configuration to get the behaviour we require with no 
> luck. We’ve now put in a temporary fix so that Kafka Streams works with our 
> framework but it feels like this might be a missing feature or potentially a 
> bug.
> +Example+
> Given:
>  * We have two messages (InA and InB).
>  * Both messages have the same partition key.
>  * A rebalance is in progress so streams is no longer able to commit.
> When:
>  # Message InA -> processor -> OutA (not committed)
>  # Message InB -> processor -> blocked because #1 has not been committed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to