[ https://issues.apache.org/jira/browse/KAFKA-6994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503005#comment-16503005 ]
Edoardo Comar commented on KAFKA-6994: -------------------------------------- Thanks for the comments. Is it also expected that {{poll(long)}} ALWAYS waits for metadata the first time it's called, and then may or may not wait on subsequent calls ? What I observed running the test multiple times : with an insufficient acl, the first {{poll(50L)}} ALWAYS throws the {{AuthorizationException}} (else the test would fail on {{Assert.fail("should have thrown Authorization Exception")}} . Subsequent calls throw the exception with a sufficient timeout when repeated. > KafkaConsumer.poll throwing AuthorizationException timeout-dependent > -------------------------------------------------------------------- > > Key: KAFKA-6994 > URL: https://issues.apache.org/jira/browse/KAFKA-6994 > Project: Kafka > Issue Type: Bug > Reporter: Edoardo Comar > Priority: Minor > > With auto-topic creation enabled, when attempting to consume from a > non-existent topic, the {{AuthorizationException}} may or may not be thrown > from {{poll(timeout)}} depending on the {{timeout}} value. > The issue can be recreated modifying a test in {{AuthorizerIntegrationTest}} > as below (see comment) to *not* add the needed acl and therefore expecting > the test to fail. > While the first {{poll}} call will always throw with a short timeout, the > second {{poll}} will not throw with the short timeout. > {code:java} > @Test > def testCreatePermissionOnClusterToReadFromNonExistentTopic() { > testCreatePermissionNeededToReadFromNonExistentTopic("newTopic", > Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), > Cluster) > } > private def testCreatePermissionNeededToReadFromNonExistentTopic(newTopic: > String, acls: Set[Acl], resType: ResourceType) { > val topicPartition = new TopicPartition(newTopic, 0) > val newTopicResource = new Resource(Topic, newTopic) > addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, > Read)), newTopicResource) > addAndVerifyAcls(groupReadAcl(groupResource), groupResource) > this.consumers.head.assign(List(topicPartition).asJava) > try { > this.consumers.head.poll(Duration.ofMillis(50L)); > Assert.fail("should have thrown Authorization Exception") > } catch { > case e: TopicAuthorizationException => > assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) > } > //val resource = if (resType == Topic) newTopicResource else > Resource.ClusterResource > // addAndVerifyAcls(acls, resource) > // need to use a larger timeout in this subsequent poll else it may not > cause topic auto-creation > // this can be verified by commenting the above addAndVerifyAcls line and > expecting this test to fail > this.consumers.head.poll(Duration.ofMillis(50L)); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)