Hey Phil,

You've stumbled onto one of the tricky aspects of the new consumer that
we've been talking about recently. KafkaConsumer.subscribe() is
asynchronous in the sense that it will return before partitions have been
assigned. We could make it synchronous, but we wouldn't be able to
guarantee how long the assignment would be active since other members of
the group or metadata changes can cause the coordinator to rebalance the
assignment. The best place to perform a seek would probably be in the
rebalance callback, which can be passed through the alternative subscribe
API. The code might look something like this:

consumer.subscribe(topics, new RebalanceListener() {
  void onPartitionsAssigned(List<TopicPartition> partitions) {
    // seek to the initial offset for the assigned partitions here
  void onPartitionsRevoked(List<TopicPartition> partitions) {
    // commit offsets if you need to

while (true) {
  ConsumerRecords records = consumer.poll(100);
  // do stuff with records

Does that make sense?


On Tue, Sep 8, 2015 at 2:59 PM, Phil Steitz <phil.ste...@gmail.com> wrote:

> I have been experimenting with the KafkaConsumer currently in
> development [1].  Sorry if this should be a question for the user
> list, but I am not sure if what I am seeing is something not working
> yet or if I am misunderstanding the API.  If I use
> KafkaConsumer#subscribe to subscribe to a topic and then try to use
> seek(TopicPartion, offset) to position the consumer, I get an
> IllegalStateException with message "No current assignment for
> partition ...."  If I use assign instead to connect to the topic,
> things work fine.  I can see why this is by looking at the
> SubscriptionState code which is throwing the ISE because
> SubscriptionState#seek expects to find an assignment, but
> KafkaConsumer#subscribe does not make any.
> I know this is unreleased code and I am not looking for help here -
> actually more like looking *to* help but just learning the code.
> Happy to open a ticket with a test case if that will help or a patch
> to the javadoc if I am misunderstanding the API and it can be made
> clearer.
> Thanks!
> Phil
> [1] ff189fa05ccdacac100f3d15d167dcbe561f57a6

