This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 62bec20aefc KAFKA-19242: Fix commit bugs caused by race condition 
during rebalancing. (#19631)
62bec20aefc is described below

commit 62bec20aefcdb94e8a66f5e3f74e40916981912a
Author: ChickenchickenLove <[email protected]>
AuthorDate: Tue May 13 00:01:29 2025 +0900

    KAFKA-19242: Fix commit bugs caused by race condition during rebalancing. 
(#19631)
    
    ### Motivation
    While investigating “events skipped in group
    
    rebalancing” 
([spring‑projects/spring‑kafka#3703](https://github.com/spring-projects/spring-kafka/issues/3703))
    I discovered a race
    condition between
    - the main poll/commit thread, and
    - the consumer‑coordinator heartbeat thread.
    
    If the main thread enters
    `ConsumerCoordinator.sendOffsetCommitRequest()` while the heartbeat
    thread is finishing a rebalance (`SyncGroupResponseHandler.handle()`),
    the group state transitions in the following order:
    
    ```
    COMPLETING_REBALANCE  →  (race window)  →  STABLE
    ```
    Because we read the state twice without a lock:
    1. `generationIfStable()` returns `null` (state still
    `COMPLETING_REBALANCE`),
    2. the heartbeat thread flips the state to `STABLE`,
    3. the main thread re‑checks with `rebalanceInProgress()` and wrongly
    decides that a rebalance is still active,
    4. a spurious `CommitFailedException` is returned even though the commit
    could succeed.
    
    For more details, please refer to sequence diagram below.  <img
    width="1494" alt="image"
    
    
src="https://github.com/user-attachments/assets/90f19af5-5e2d-4566-aece-ef764df2d89c";
    />
    
    ### Impact
    - The exception is semantically wrong: the consumer is in a stable
    group, but reports failure.
    - Frameworks and applications that rely on the semantics of
    `CommitFailedException` and `RetryableCommitException` (for example
    `Spring Kafka`) take the wrong code path, which can ultimately skip the
    events and break “at‑most‑once” guarantees.
    
    ### Fix
    We enlarge the synchronized block in
    `ConsumerCoordinator.sendOffsetCommitRequest()` so that the consumer
    group state is examined atomically with respect to the heartbeat thread:
    
    ### Jira
    https://issues.apache.org/jira/browse/KAFKA-19242
    
    https: //github.com/spring-projects/spring-kafka/issues/3703
    
    Signed-off-by: chickenchickenlove <[email protected]>
    
    Reviewers: David Jacot <[email protected]>
---
 .../consumer/internals/ConsumerCoordinator.java    | 36 ++++++++++++----------
 1 file changed, 19 insertions(+), 17 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9abed8f7fb4..75767726924 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1305,23 +1305,25 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         final Generation generation;
         final String groupInstanceId;
         if (subscriptions.hasAutoAssignedPartitions()) {
-            generation = generationIfStable();
-            groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null);
-            // if the generation is null, we are not part of an active group 
(and we expect to be).
-            // the only thing we can do is fail the commit and let the user 
rejoin the group in poll().
-            if (generation == null) {
-                log.info("Failing OffsetCommit request since the consumer is 
not part of an active group");
-
-                if (rebalanceInProgress()) {
-                    // if the client knows it is already rebalancing, we can 
use RebalanceInProgressException instead of
-                    // CommitFailedException to indicate this is not a fatal 
error
-                    return RequestFuture.failure(new 
RebalanceInProgressException("Offset commit cannot be completed since the " +
-                        "consumer is undergoing a rebalance for auto partition 
assignment. You can try completing the rebalance " +
-                        "by calling poll() and then retry the operation."));
-                } else {
-                    return RequestFuture.failure(new 
CommitFailedException("Offset commit cannot be completed since the " +
-                        "consumer is not part of an active group for auto 
partition assignment; it is likely that the consumer " +
-                        "was kicked out of the group."));
+            synchronized (ConsumerCoordinator.this) {
+                generation = generationIfStable();
+                groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null);
+                // if the generation is null, we are not part of an active 
group (and we expect to be).
+                // the only thing we can do is fail the commit and let the 
user rejoin the group in poll().
+                if (generation == null) {
+                    log.info("Failing OffsetCommit request since the consumer 
is not part of an active group");
+
+                    if (rebalanceInProgress()) {
+                        // if the client knows it is already rebalancing, we 
can use RebalanceInProgressException instead of
+                        // CommitFailedException to indicate this is not a 
fatal error
+                        return RequestFuture.failure(new 
RebalanceInProgressException("Offset commit cannot be completed since the " +
+                            "consumer is undergoing a rebalance for auto 
partition assignment. You can try completing the rebalance " +
+                            "by calling poll() and then retry the 
operation."));
+                    } else {
+                        return RequestFuture.failure(new 
CommitFailedException("Offset commit cannot be completed since the " +
+                            "consumer is not part of an active group for auto 
partition assignment; it is likely that the consumer " +
+                            "was kicked out of the group."));
+                    }
                 }
             }
         } else {

Reply via email to