Hello!
 I'm using Kafka 0.9.1. Suppose that I have created a topic "my-topic" with
1 partition.With the following code, I got StaleMetadataException  in
Fetcher->listOffset method and the thread is blocked in an infinite while
loop (while true).
I came to this error by mistake, so what to do in this cases? Should we do
a check in Zookeeper that partition really exists and if not then thrown an
exception or should we try for a max retries or timeout exceeds and escape
the infinite loop?
I think a JIRA should be raised.
Regards,
 Florin

public class BadAssignedPartitionKConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers",
"localhost:9092");
props.put("group.id", "test");

props.put("client.id", UUID.randomUUID().toString());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "my-topic";
*consumer.assign(Arrays.asList(new TopicPartition(topic, 6)));*
* consumer.position(new TopicPartition(topic, 6));*

consumer.close();

}
}

Here is the Fetcher's method
private long listOffset(TopicPartition partition, long timestamp) {
*        while (true) {//infinte loop*
            RequestFuture<Long> future = sendListOffsetRequest(partition,
timestamp);
            client.poll(future);

            if (future.succeeded())
                return future.value();

            if (!future.isRetriable())
                throw future.exception();

*            if (future.exception() instanceof InvalidMetadataException)*
*                client.awaitMetadataUpdate();//here I got the *
*StaleMetadataException
and we will not escape the loop*
            else
                time.sleep(retryBackoffMs);
        }
    }

Reply via email to