[ https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15301027#comment-15301027 ]
Edoardo Comar commented on KAFKA-3727: -------------------------------------- Thanks [~ijuma] those issues are related. However [KAFKA-3177|https://issues.apache.org/jira/browse/KAFKA-3177] does not mention poll() which is the most basic api call for a consumer. And [KAFKA-2391|https://issues.apache.org/jira/browse/KAFKA-2391] mentions adding timeouts as arguments in the API, and such an argument is already present in KafkaConsumer.poll (timeout) . This issue is that poll (timeout) will block FOREVER, when a consumer is assigned a non non-existent TopicPartition. > Consumer.poll() stuck in loop on non-existent topic manually assigned > --------------------------------------------------------------------- > > Key: KAFKA-3727 > URL: https://issues.apache.org/jira/browse/KAFKA-3727 > Project: Kafka > Issue Type: Bug > Components: clients > Reporter: Edoardo Comar > Priority: Critical > > The behavior of a consumer on poll() for a non-existing topic is surprisingly > different/inconsistent > between a consumer that subscribed to the topic and one that had the > topic-partition manually assigned. > The "subscribed" consumer will return an empty collection > The "assigned" consumer will *loop forever* - this feels a bug to me. > sample snippet to reproduce: > {quote} > KafkaConsumer<String, String> assignKc = new KafkaConsumer<>(props1); > KafkaConsumer<String, String> subsKc = new KafkaConsumer<>(props2); > List<TopicPartition> tps = new ArrayList<>(); > tps.add(new TopicPartition("topic-not-exists", 0)); > assignKc.assign(tps); > subsKc.subscribe(Arrays.asList("topic-not-exists")); > System.out.println("********* subscribe k consumer "); > ConsumerRecords<String, String> crs2 = subsKc.poll(1000L); > print("subscribeKc", crs2); // returns empty > System.out.println("********* assign k consumer "); > ConsumerRecords<String, String> crs1 = assignKc.poll(1000L); > // will loop forever ! > print("assignKc", crs1); > {quote} > the logs for the "assigned" consumer show: > [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to > Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) > (org.apache.kafka.clients.Metadata) > [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for > fetching offset, wait for metadata refresh > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2016-05-18 17:33:10,010] DEBUG Sending metadata request > {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient) > [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation > id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} > (org.apache.kafka.clients.NetworkClient) -- This message was sent by Atlassian JIRA (v6.3.4#6332)