This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0600abdde3b KAFKA-19300 AsyncConsumer#unsubscribe always timeout due
to GroupAuthorizationException (#19779)
0600abdde3b is described below
commit 0600abdde3bc14858d7c96581d56b0faa8a19574
Author: Nick Guo <[email protected]>
AuthorDate: Tue May 27 00:52:56 2025 +0800
KAFKA-19300 AsyncConsumer#unsubscribe always timeout due to
GroupAuthorizationException (#19779)
I verified the behavior by rewriting the
`GroupAuthorizerIntegrationTest` in Java in this PR:
https://github.com/apache/kafka/pull/19685 The state is now correct.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/consumer/internals/AbstractMembershipManager.java | 2 +-
.../scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala | 3 ++-
2 files changed, 3 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 3c0d53bc9da..30d28fb722f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -443,7 +443,7 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
log.error("Member {} with epoch {} transitioned to fatal state",
memberId, memberEpoch);
notifyEpochChange(Optional.empty());
- if (previousState == MemberState.UNSUBSCRIBED) {
+ if (previousState == MemberState.UNSUBSCRIBED &&
maybeCompleteLeaveInProgress()) {
log.debug("Member {} with epoch {} got fatal error from the broker
but it already " +
"left the group, so onPartitionsLost callback won't be
triggered.", memberId, memberEpoch);
return;
diff --git
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index 8c8d05c6072..01d18114a04 100644
---
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -34,7 +34,7 @@ import
org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
-import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@@ -135,6 +135,7 @@ class GroupAuthorizerIntegrationTest extends
BaseRequestTest {
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
+ @Timeout(60)
def testConsumeUnsubscribeWithoutGroupPermission(groupProtocol: String):
Unit = {
val topic = "topic"