This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0600abdde3b KAFKA-19300 AsyncConsumer#unsubscribe always timeout due 
to GroupAuthorizationException (#19779)
0600abdde3b is described below

commit 0600abdde3bc14858d7c96581d56b0faa8a19574
Author: Nick Guo <[email protected]>
AuthorDate: Tue May 27 00:52:56 2025 +0800

    KAFKA-19300 AsyncConsumer#unsubscribe always timeout due to 
GroupAuthorizationException (#19779)
    
    I verified the behavior by rewriting the
    `GroupAuthorizerIntegrationTest` in Java in this PR:
    https://github.com/apache/kafka/pull/19685  The state is now correct.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/consumer/internals/AbstractMembershipManager.java    | 2 +-
 .../scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala   | 3 ++-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 3c0d53bc9da..30d28fb722f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -443,7 +443,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
         log.error("Member {} with epoch {} transitioned to fatal state", 
memberId, memberEpoch);
         notifyEpochChange(Optional.empty());
 
-        if (previousState == MemberState.UNSUBSCRIBED) {
+        if (previousState == MemberState.UNSUBSCRIBED && 
maybeCompleteLeaveInProgress()) {
             log.debug("Member {} with epoch {} got fatal error from the broker 
but it already " +
                     "left the group, so onPartitionsLost callback won't be 
triggered.", memberId, memberEpoch);
             return;
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index 8c8d05c6072..01d18114a04 100644
--- 
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -34,7 +34,7 @@ import 
org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
 import org.apache.kafka.server.config.ServerConfigs
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.function.Executable
-import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
 
@@ -135,6 +135,7 @@ class GroupAuthorizerIntegrationTest extends 
BaseRequestTest {
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
   @MethodSource(Array("getTestGroupProtocolParametersAll"))
+  @Timeout(60)
   def testConsumeUnsubscribeWithoutGroupPermission(groupProtocol: String): 
Unit = {
     val topic = "topic"
 

Reply via email to