[ https://issues.apache.org/jira/browse/KAFKA-6994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16501550#comment-16501550 ]
Ismael Juma commented on KAFKA-6994: ------------------------------------ This doesn't seem like a bug. The new poll method may return before the metadata request has completed if the timeout is low enough. This is covered in the relevant KIP, javadoc and upgrade notes. Would that explain the behaviour? > KafkaConsumer.poll throwing AuthorizationException timeout-dependent > -------------------------------------------------------------------- > > Key: KAFKA-6994 > URL: https://issues.apache.org/jira/browse/KAFKA-6994 > Project: Kafka > Issue Type: Bug > Reporter: Edoardo Comar > Priority: Minor > > With auto-topic creation enabled, when attempting to consume from a > non-existent topic, theĀ {{AuthorizationException}} may or may not be thrown > from {{poll(timeout)}} depending on the {{timeout}} value. > The issue can be recreated modifying a test in {{AuthorizerIntegrationTest}} > as below (see comment) to *not* add the needed acl and therefore expecting > the test to fail. > While the first {{poll}} call will always throw with a short timeout, the > second {{poll}} will not throw with the short timeout. > {code:java} > @Test > def testCreatePermissionOnClusterToReadFromNonExistentTopic() { > testCreatePermissionNeededToReadFromNonExistentTopic("newTopic", > Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), > Cluster) > } > private def testCreatePermissionNeededToReadFromNonExistentTopic(newTopic: > String, acls: Set[Acl], resType: ResourceType) { > val topicPartition = new TopicPartition(newTopic, 0) > val newTopicResource = new Resource(Topic, newTopic) > addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, > Read)), newTopicResource) > addAndVerifyAcls(groupReadAcl(groupResource), groupResource) > this.consumers.head.assign(List(topicPartition).asJava) > try { > this.consumers.head.poll(Duration.ofMillis(50L)); > Assert.fail("should have thrown Authorization Exception") > } catch { > case e: TopicAuthorizationException => > assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) > } > //val resource = if (resType == Topic) newTopicResource else > Resource.ClusterResource > // addAndVerifyAcls(acls, resource) > // need to use a larger timeout in this subsequent poll else it may not > cause topic auto-creation > // this can be verified by commenting the above addAndVerifyAcls line and > expecting this test to fail > this.consumers.head.poll(Duration.ofMillis(50L)); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)