[ https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342295#comment-15342295 ]
ASF GitHub Bot commented on KAFKA-3727: --------------------------------------- GitHub user edoardocomar opened a pull request: https://github.com/apache/kafka/pull/1535 KAFKA-3727 - ConsumerListener for UnknownTopicOrPartitionException Added a ConsumerListener to KafkaConsumer Modified Fetcher and Cluster to account for Unknown Topics Added unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/edoardocomar/kafka KAFKA-3727b Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1535.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1535 ---- commit 68515ec1d945c438ebd0b41a64406eac5c971dbb Author: Edoardo Comar <eco...@uk.ibm.com> Date: 2016-06-20T22:16:23Z KAFKA-3727 - ConsumerListener for UnknownTopicOrPartitionException Added a ConsumerListener to KafkaConsumer Modified Fetcher and Cluster to account for Unknown Topics Added unit test ---- > Consumer.poll() stuck in loop on non-existent topic manually assigned > --------------------------------------------------------------------- > > Key: KAFKA-3727 > URL: https://issues.apache.org/jira/browse/KAFKA-3727 > Project: Kafka > Issue Type: Bug > Components: clients > Reporter: Edoardo Comar > Assignee: Edoardo Comar > Priority: Critical > > The behavior of a consumer on poll() for a non-existing topic is surprisingly > different/inconsistent > between a consumer that subscribed to the topic and one that had the > topic-partition manually assigned. > The "subscribed" consumer will return an empty collection > The "assigned" consumer will *loop forever* - this feels a bug to me. > sample snippet to reproduce: > {quote} > KafkaConsumer<String, String> assignKc = new KafkaConsumer<>(props1); > KafkaConsumer<String, String> subsKc = new KafkaConsumer<>(props2); > List<TopicPartition> tps = new ArrayList<>(); > tps.add(new TopicPartition("topic-not-exists", 0)); > assignKc.assign(tps); > subsKc.subscribe(Arrays.asList("topic-not-exists")); > System.out.println("********* subscribe k consumer "); > ConsumerRecords<String, String> crs2 = subsKc.poll(1000L); > print("subscribeKc", crs2); // returns empty > System.out.println("********* assign k consumer "); > ConsumerRecords<String, String> crs1 = assignKc.poll(1000L); > // will loop forever ! > print("assignKc", crs1); > {quote} > the logs for the "assigned" consumer show: > [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to > Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) > (org.apache.kafka.clients.Metadata) > [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for > fetching offset, wait for metadata refresh > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2016-05-18 17:33:10,010] DEBUG Sending metadata request > {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient) > [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation > id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} > (org.apache.kafka.clients.NetworkClient) -- This message was sent by Atlassian JIRA (v6.3.4#6332)