This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 5015183c1c3 KAFKA-19242: Fix commit bugs caused by race condition
during rebalancing. (#19631)
5015183c1c3 is described below
commit 5015183c1c38740900552e8cf1c0e491c038b2df
Author: ChickenchickenLove <[email protected]>
AuthorDate: Tue May 13 00:01:29 2025 +0900
KAFKA-19242: Fix commit bugs caused by race condition during rebalancing.
(#19631)
### Motivation
While investigating “events skipped in group
rebalancing”
([spring‑projects/spring‑kafka#3703](https://github.com/spring-projects/spring-kafka/issues/3703))
I discovered a race
condition between
- the main poll/commit thread, and
- the consumer‑coordinator heartbeat thread.
If the main thread enters
`ConsumerCoordinator.sendOffsetCommitRequest()` while the heartbeat
thread is finishing a rebalance (`SyncGroupResponseHandler.handle()`),
the group state transitions in the following order:
```
COMPLETING_REBALANCE → (race window) → STABLE
```
Because we read the state twice without a lock:
1. `generationIfStable()` returns `null` (state still
`COMPLETING_REBALANCE`),
2. the heartbeat thread flips the state to `STABLE`,
3. the main thread re‑checks with `rebalanceInProgress()` and wrongly
decides that a rebalance is still active,
4. a spurious `CommitFailedException` is returned even though the commit
could succeed.
For more details, please refer to sequence diagram below. <img
width="1494" alt="image"
src="https://github.com/user-attachments/assets/90f19af5-5e2d-4566-aece-ef764df2d89c"
/>
### Impact
- The exception is semantically wrong: the consumer is in a stable
group, but reports failure.
- Frameworks and applications that rely on the semantics of
`CommitFailedException` and `RetryableCommitException` (for example
`Spring Kafka`) take the wrong code path, which can ultimately skip the
events and break “at‑most‑once” guarantees.
### Fix
We enlarge the synchronized block in
`ConsumerCoordinator.sendOffsetCommitRequest()` so that the consumer
group state is examined atomically with respect to the heartbeat thread:
### Jira
https://issues.apache.org/jira/browse/KAFKA-19242
https: //github.com/spring-projects/spring-kafka/issues/3703
Signed-off-by: chickenchickenlove <[email protected]>
Reviewers: David Jacot <[email protected]>
---
.../consumer/internals/ConsumerCoordinator.java | 36 ++++++++++++----------
1 file changed, 19 insertions(+), 17 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 584a03736f9..8d54c871bb9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1263,23 +1263,25 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
final Generation generation;
final String groupInstanceId;
if (subscriptions.hasAutoAssignedPartitions()) {
- generation = generationIfStable();
- groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null);
- // if the generation is null, we are not part of an active group
(and we expect to be).
- // the only thing we can do is fail the commit and let the user
rejoin the group in poll().
- if (generation == null) {
- log.info("Failing OffsetCommit request since the consumer is
not part of an active group");
-
- if (rebalanceInProgress()) {
- // if the client knows it is already rebalancing, we can
use RebalanceInProgressException instead of
- // CommitFailedException to indicate this is not a fatal
error
- return RequestFuture.failure(new
RebalanceInProgressException("Offset commit cannot be completed since the " +
- "consumer is undergoing a rebalance for auto partition
assignment. You can try completing the rebalance " +
- "by calling poll() and then retry the operation."));
- } else {
- return RequestFuture.failure(new
CommitFailedException("Offset commit cannot be completed since the " +
- "consumer is not part of an active group for auto
partition assignment; it is likely that the consumer " +
- "was kicked out of the group."));
+ synchronized (ConsumerCoordinator.this) {
+ generation = generationIfStable();
+ groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null);
+ // if the generation is null, we are not part of an active
group (and we expect to be).
+ // the only thing we can do is fail the commit and let the
user rejoin the group in poll().
+ if (generation == null) {
+ log.info("Failing OffsetCommit request since the consumer
is not part of an active group");
+
+ if (rebalanceInProgress()) {
+ // if the client knows it is already rebalancing, we
can use RebalanceInProgressException instead of
+ // CommitFailedException to indicate this is not a
fatal error
+ return RequestFuture.failure(new
RebalanceInProgressException("Offset commit cannot be completed since the " +
+ "consumer is undergoing a rebalance for auto
partition assignment. You can try completing the rebalance " +
+ "by calling poll() and then retry the
operation."));
+ } else {
+ return RequestFuture.failure(new
CommitFailedException("Offset commit cannot be completed since the " +
+ "consumer is not part of an active group for auto
partition assignment; it is likely that the consumer " +
+ "was kicked out of the group."));
+ }
}
}
} else {