Hello! I'm using Kafka 0.9.1. Suppose that I have created a topic "my-topic" with 1 partition.With the following code, I got StaleMetadataException in Fetcher->listOffset method and the thread is blocked in an infinite while loop (while true). I came to this error by mistake, so what to do in this cases? Should we do a check in Zookeeper that partition really exists and if not then thrown an exception or should we try for a max retries or timeout exceeds and escape the infinite loop? I think a JIRA should be raised. Regards, Florin
public class BadAssignedPartitionKConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("client.id", UUID.randomUUID().toString()); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); String topic = "my-topic"; *consumer.assign(Arrays.asList(new TopicPartition(topic, 6)));* * consumer.position(new TopicPartition(topic, 6));* consumer.close(); } } Here is the Fetcher's method private long listOffset(TopicPartition partition, long timestamp) { * while (true) {//infinte loop* RequestFuture<Long> future = sendListOffsetRequest(partition, timestamp); client.poll(future); if (future.succeeded()) return future.value(); if (!future.isRetriable()) throw future.exception(); * if (future.exception() instanceof InvalidMetadataException)* * client.awaitMetadataUpdate();//here I got the * *StaleMetadataException and we will not escape the loop* else time.sleep(retryBackoffMs); } }