Using the 0.9 consumer, I would like to start consuming at the beginning or end, without specifying auto.offset.reset.
This does not seem to be possible: val kafkaParams = Map[String, Object]( "bootstrap.servers" -> conf.getString("kafka.brokers"), "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "example", "auto.offset.reset" -> "none" ).asJava val topics = conf.getString("kafka.topics").split(",").toList.asJava val consumer = new KafkaConsumer[String, String](kafkaParams) consumer.subscribe(topics) consumer.seekToBeginning() consumer.poll(0) Results in: Exception in thread "main" org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition: testtwo-4 at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) at example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25) I'm assuming this is because, at the time seekToBeginning() is called, subscriptions.assignedPartitions isn't populated. But polling in order to assign topicpartitions results in an error, which creates a chicken-or-the-egg situation. I don't want to set auto.offset.reset, because I want a hard error if the offsets are out of range at any other time during consumption.