Using the 0.9 consumer, I would like to start consuming at the
beginning or end, without specifying auto.offset.reset.

This does not seem to be possible:

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> conf.getString("kafka.brokers"),
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "example",
      "auto.offset.reset" -> "none"
    ).asJava
    val topics = conf.getString("kafka.topics").split(",").toList.asJava
    val consumer = new KafkaConsumer[String, String](kafkaParams)
    consumer.subscribe(topics)
    consumer.seekToBeginning()
    consumer.poll(0)


Results in:

Exception in thread "main"
org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
Undefined offset with no reset policy for partition: testtwo-4
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
        at example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)


I'm assuming this is because, at the time seekToBeginning() is called,
subscriptions.assignedPartitions isn't populated.  But polling in
order to assign topicpartitions results in an error, which creates a
chicken-or-the-egg situation.

I don't want to set auto.offset.reset, because I want a hard error if
the offsets are out of range at any other time during consumption.

Reply via email to