FrankYang0529 commented on code in PR #17516:
URL: https://github.com/apache/kafka/pull/17516#discussion_r1836296265
##########
core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala:
##########
@@ -124,12 +128,56 @@ class GroupAuthorizerIntegrationTest extends
BaseRequestTest {
() => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
"message".getBytes)).get()).getCause
assertTrue(produceException.isInstanceOf[TopicAuthorizationException])
assertEquals(Set(topic),
produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala)
+ }
- val consumer = createConsumer(configsToRemove =
List(ConsumerConfig.GROUP_ID_CONFIG))
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testUnauthorizedConsumeUnsubscribe(quorum: String, groupProtocol:
String): Unit = {
+ val topic = "topic"
+ val topicPartition = new TopicPartition(topic, 0)
+
+ createTopic(topic, listenerName = interBrokerListenerName)
+
+ val consumer = createConsumer()
+ consumer.assign(List(topicPartition).asJava)
+ val consumeException = assertThrows(classOf[TopicAuthorizationException],
+ () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+ assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
+
+ assertThrows(classOf[GroupAuthorizationException],
+ () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+
+ // TODO: use background-event-queue-size metric to check there is
background event
+ Thread.sleep(3000)
+
+ assertDoesNotThrow(new Executable {
+ override def execute(): Unit = consumer.unsubscribe()
+ })
Review Comment:
Hi @lianetm, thanks for sharing the details in `ClassicKafkaConsumer`. For
`ClassicKafkaConsumer#unsubscribe`, it doesn't check future object from
`maybeLeaveGroup`, so `unsubscribe` doesn't throw exceptions. I remain
`unsubscribe` function test for `AsyncKafkaConsumer` only, and keep `close`
function test for both consumers.
I have another question here. IIUC, we don't need to swallow
`TopicAuthorizationException`, because we don't check topic permission when
doing consumer group heartbeat request. So I think we only need to swallow
`GroupAuthorizationException` here. WDYT? Thanks.
https://github.com/apache/kafka/blob/9db5ed00a8369d5c696e836661230110ea2ea44d/core/src/main/scala/kafka/server/KafkaApis.scala#L3812-L3815
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]