John Roesler created KAFKA-7000:
-----------------------------------

             Summary: KafkaConsumer.position should wait for assignment metadata
                 Key: KAFKA-7000
                 URL: https://issues.apache.org/jira/browse/KAFKA-7000
             Project: Kafka
          Issue Type: Improvement
            Reporter: John Roesler
            Assignee: John Roesler


While updating Kafka Streams to stop using the deprecated Consumer.poll(long), 
I found that this code unexpectedly throws an exception:
{code:java}
consumer.subscribe(topics);
// consumer.poll(0); <- I've removed this line, which shouldn't be necessary 
here.

final Set<TopicPartition> partitions = new HashSet<>();
for (final String topic : topics) {
    for (final PartitionInfo partition : consumer.partitionsFor(topic)) {
        partitions.add(new TopicPartition(partition.topic(), 
partition.partition()));
    }
}

for (final TopicPartition tp : partitions) {
    final long offset = consumer.position(tp);
    committedOffsets.put(tp, offset);
}{code}
Here is the exception:
{code:java}
Exception in thread "main" java.lang.IllegalStateException: You can only check 
the position for partitions assigned to this consumer.
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1620)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1586)
   at 
org.apache.kafka.streams.tests.EosTestDriver.getCommittedOffsets(EosTestDriver.java:275)
   at 
org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:148)
   at 
org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:69){code}
 

As you can see in the commented code in my snippet, we used to block for 
assignment with a poll(0), which is now deprecated.

It seems reasonable to me for position() to do the same thing that poll() does, 
which is call `coordinator.poll(timeout.toMillis())` early in processing to 
ensure an up-to-date assignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to