I am using the Java Kafka 0.9 client. When I subscribe to a topic I provide
a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I am
doing this:
partitions.foreach( (tp: TopicPartition) => {
consumer.seek(tp, consumer.position(tp))
})
However, sometimes I end up an infinite loop with IllegalStateExceptions
being thrown [1]:
No current assignment for partition <topic-partition>
I thought it was safe to seek because the consumer should have been
assigned when this method is invoked. Am I missing something?
For what it's worth, I am manually committing offsets (using commitSync).
[1] -
https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228