[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16116589#comment-16116589
 ] 

Balint Molnar commented on KAFKA-4879:
--------------------------------------

[~guozhang] I debugged this issue, my findings are the following:
* The partition size for the topic is need to be bigger than num.partitions 
configuration value.
* This problem only occurs when delete.topic.enable is set to true.
* This problem only occurs when auto.create.topics.enable set to true. (if this 
one is set to false then it does not matter how many partitions are set in 
num.partitions the position will hang indefinitely.)
* The main problem here is: during the consumer.position call the consumer will 
recreate the topic with the num.partitions partitions and if it is smaller than 
the original topic's partition num it will hang indefinitely. 

> KafkaConsumer.position may hang forever when deleting a topic
> -------------------------------------------------------------
>
>                 Key: KAFKA-4879
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4879
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.10.2.0
>            Reporter: Shixiong Zhu
>            Assignee: Balint Molnar
>             Fix For: 1.0.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
>     // Make sure "delete.topic.enable" is set to true.
>     // Please create the topic test with "3" partitions manually.
>     // The issue is gone when there is only one partition.
>     String topic = "test";
>     Properties props = new Properties();
>     props.put("bootstrap.servers", "localhost:9092");
>     props.put("group.id", "testgroup");
>     props.put("value.deserializer", StringDeserializer.class.getName());
>     props.put("key.deserializer", StringDeserializer.class.getName());
>     props.put("enable.auto.commit", "false");
>     KafkaConsumer kc = new KafkaConsumer(props);
>     kc.subscribe(Collections.singletonList(topic));
>     kc.poll(0);
>     Set<TopicPartition> partitions = kc.assignment();
>     System.out.println("partitions: " + partitions);
>     kc.pause(partitions);
>     kc.seekToEnd(partitions);
>     System.out.println("please delete the topic in 30 seconds");
>     try {
>       // Sleep 30 seconds to give us enough time to delete the topic.
>       Thread.sleep(30000);
>     } catch (InterruptedException e) {
>       e.printStackTrace();
>     }
>     System.out.println("sleep end");
>     for (TopicPartition p : partitions) {
>       System.out.println(p + " offset: " + kc.position(p));
>     }
>     System.out.println("cannot reach here");
>     kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to