[ https://issues.apache.org/jira/browse/KAFKA-17581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lianet Magrans resolved KAFKA-17581. ------------------------------------ Resolution: Fixed > AsyncKafkaConsumer can't unsubscribe invalid topics > --------------------------------------------------- > > Key: KAFKA-17581 > URL: https://issues.apache.org/jira/browse/KAFKA-17581 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Reporter: PoAn Yang > Assignee: PoAn Yang > Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > > When consumer subscribes an invalid topic name like " this is test", classic > consumer can unsubscribe without error. However, async consumer can't. We can > use following integration test to validate: > > {code:java} > @ParameterizedTest(name = > TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) > @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) > def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = { > // Invalid topic name due to space > val invalidTopicName = "topic abc" > val consumer = createConsumer() > consumer.subscribe(List(invalidTopicName).asJava) > var exception : InvalidTopicException = null > TestUtils.waitUntilTrue(() => { > try consumer.poll(Duration.ofMillis(500)) catch { > case e : InvalidTopicException => exception = e > case e : Throwable => fail(s"An InvalidTopicException should be thrown. > But ${e.getClass} is thrown") > } > exception != null > }, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.") > assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage) > // AsyncKafkaConsumer sends request in background thread. Wait enough time > to send next request. > Thread.sleep(1000) > assertDoesNotThrow(new Executable { > override def execute(): Unit = consumer.unsubscribe() > }) > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)