Shixiong Zhu created KAFKA-4879:
-----------------------------------

             Summary: KafkaConsumer.position may hang forever when deleting a 
topic
                 Key: KAFKA-4879
                 URL: https://issues.apache.org/jira/browse/KAFKA-4879
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 0.10.2.0
            Reporter: Shixiong Zhu


KafkaConsumer.position may hang forever when deleting a topic. The problem is 
this line 
https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
The timeout is "Long.MAX_VALUE", and it will just retry forever for 
UnknownTopicOrPartitionException.

Here is a reproducer
{code}
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;

public class KafkaReproducer {

  public static void main(String[] args) {
    // Make sure "delete.topic.enable" is set to true.
    // Please create the topic test with "3" partitions manually.
    // The issue is gone when there is only one partition.
    String topic = "test";

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "testgroup");
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("enable.auto.commit", "false");

    KafkaConsumer kc = new KafkaConsumer(props);

    kc.subscribe(Collections.singletonList(topic));

    kc.poll(0);
    Set<TopicPartition> partitions = kc.assignment();
    System.out.println("partitions: " + partitions);
    kc.pause(partitions);
    kc.seekToEnd(partitions);

    System.out.println("please delete the topic in 30 seconds");
    try {
      // Sleep 30 seconds to give us enough time to delete the topic.
      Thread.sleep(30000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("sleep end");
    for (TopicPartition p : partitions) {
      System.out.println(p + " offset: " + kc.position(p));
    }
    System.out.println("cannot reach here");
    kc.close();
  }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to