lianetm commented on code in PR #17516:
URL: https://github.com/apache/kafka/pull/17516#discussion_r1837107494
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1276,10 +1277,12 @@ private void releaseAssignmentAndLeaveGroup(final Timer
timer) {
UnsubscribeEvent unsubscribeEvent = new
UnsubscribeEvent(calculateDeadlineMs(timer));
applicationEventHandler.add(unsubscribeEvent);
try {
- // If users subscribe to an invalid topic name, they will get
InvalidTopicException in error events,
- // because network thread keeps trying to send MetadataRequest in
the background.
- // Ignore it to avoid unsubscribe failed.
- processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e
instanceof InvalidTopicException);
+ // If users subscribe to a topic with invalid name or without
permission, they will get some exceptions.
+ // Because network thread keeps trying to send MetadataRequest or
ConsumerGroupHeartbeatRequest in the background,
+ // there will be some error events in the background queue.
+ // When running close, these exceptions should be ignored, or
users can't close successfully.
+ processBackgroundEvents(unsubscribeEvent.future(), timer,
+ e -> e instanceof InvalidTopicException || e instanceof
GroupAuthorizationException);
Review Comment:
what about reverting these changes to keep this PR addressing the
unsubscribe only? (aligned with the PR name). I think it will be convenient
because there is another ongoing PR https://github.com/apache/kafka/pull/16686
changing the close, and this "releaseAssignmentAndLeaveGroup" does not
processBackgroundEvents anymore, so the issue you're trying to solve here
disappears (we might remain with the other issue of events unaware of metadata
errors, but that is already addressed in another PR
https://github.com/apache/kafka/pull/17440 too so let's address those concerns
there). Makes sense?
##########
core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala:
##########
@@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends
BaseRequestTest {
assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
Review Comment:
this test passes for both consumers I expect right? We should enabled it for
both
##########
core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala:
##########
@@ -124,12 +128,56 @@ class GroupAuthorizerIntegrationTest extends
BaseRequestTest {
() => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
"message".getBytes)).get()).getCause
assertTrue(produceException.isInstanceOf[TopicAuthorizationException])
assertEquals(Set(topic),
produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala)
+ }
- val consumer = createConsumer(configsToRemove =
List(ConsumerConfig.GROUP_ID_CONFIG))
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testUnauthorizedConsumeUnsubscribe(quorum: String, groupProtocol:
String): Unit = {
+ val topic = "topic"
+ val topicPartition = new TopicPartition(topic, 0)
+
+ createTopic(topic, listenerName = interBrokerListenerName)
+
+ val consumer = createConsumer()
+ consumer.assign(List(topicPartition).asJava)
+ val consumeException = assertThrows(classOf[TopicAuthorizationException],
+ () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+ assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
+
+ assertThrows(classOf[GroupAuthorizationException],
+ () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+
+ // TODO: use background-event-queue-size metric to check there is
background event
+ Thread.sleep(3000)
+
+ assertDoesNotThrow(new Executable {
+ override def execute(): Unit = consumer.unsubscribe()
+ })
Review Comment:
> For ClassicKafkaConsumer#unsubscribe, it doesn't check future object from
maybeLeaveGroup, so unsubscribe doesn't throw exceptions
Agreed, the unused returned value is the reason why the error doesn't bubble
up. Still I think we should keep the test for both consumers (it does pass for
the classic I expect, this is just the reason why it passes)
> I have another question here. IIUC, we don't need to swallow
TopicAuthorizationException, because we don't check topic permission when doing
consumer group heartbeat request. So I think we only need to swallow
GroupAuthorizationException here. WDYT
The trick is that the client internally sends metadata request on poll, and
is on those that we could get the topic auth error (that would bubble up in the
consumer on processBackgroundEvents). Because of that I think we need to
consider that we could receive both (group auth received in a HB response,
topic auth received in a metadata response)
##########
core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala:
##########
@@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends
BaseRequestTest {
assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testConsumeUnsubscribeWithoutGroupPermission(quorum: String,
groupProtocol: String): Unit = {
+ val topic = "topic"
+
+ createTopic(topic, listenerName = interBrokerListenerName)
+
+ // allow topic read/write permission to poll/send record
+ addAndVerifyAcls(
+ Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW),
createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+ new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+ )
+ val producer = createProducer()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
"message".getBytes)).get()
+ producer.close()
+
+ // allow group read permission to join group
+ val group = "group"
+ addAndVerifyAcls(
+ Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+ new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+ )
+
+ val props = new Properties()
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ val consumer = createConsumer(configOverrides = props)
+ consumer.subscribe(List(topic).asJava)
+ TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
+
+ removeAndVerifyAcls(
+ Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+ new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+ )
+
+ assertDoesNotThrow(new Executable {
+ override def execute(): Unit = consumer.unsubscribe()
+ })
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol:
String): Unit = {
Review Comment:
This test is valuable, but if we agree on leaving this PR for the
unsubscribe to leverage the close/callbacks changes in
https://github.com/apache/kafka/pull/16686 we should probably bring this test
back in a separate PR that one goes in? What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]