[ 
https://issues.apache.org/jira/browse/KAFKA-17581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans resolved KAFKA-17581.
------------------------------------
    Resolution: Fixed

> AsyncKafkaConsumer can't unsubscribe invalid topics
> ---------------------------------------------------
>
>                 Key: KAFKA-17581
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17581
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: PoAn Yang
>            Assignee: PoAn Yang
>            Priority: Major
>              Labels: consumer-threading-refactor, kip-848-client-support
>
> When consumer subscribes an invalid topic name like " this is test", classic 
> consumer can unsubscribe without error. However, async consumer can't. We can 
> use following integration test to validate:
>  
> {code:java}
> @ParameterizedTest(name = 
> TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
> @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
> def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = {
>   // Invalid topic name due to space
>   val invalidTopicName = "topic abc"
>   val consumer = createConsumer()
>   consumer.subscribe(List(invalidTopicName).asJava)
>   var exception : InvalidTopicException = null
>   TestUtils.waitUntilTrue(() => {
>     try consumer.poll(Duration.ofMillis(500)) catch {
>       case e : InvalidTopicException => exception = e
>       case e : Throwable => fail(s"An InvalidTopicException should be thrown. 
> But ${e.getClass} is thrown")
>     }
>     exception != null
>   }, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.")
>   assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage)
>   // AsyncKafkaConsumer sends request in background thread. Wait enough time 
> to send next request.
>   Thread.sleep(1000)
>   assertDoesNotThrow(new Executable {
>     override def execute(): Unit = consumer.unsubscribe()
>   })
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to