John Roesler created KAFKA-7000: ----------------------------------- Summary: KafkaConsumer.position should wait for assignment metadata Key: KAFKA-7000 URL: https://issues.apache.org/jira/browse/KAFKA-7000 Project: Kafka Issue Type: Improvement Reporter: John Roesler Assignee: John Roesler
While updating Kafka Streams to stop using the deprecated Consumer.poll(long), I found that this code unexpectedly throws an exception: {code:java} consumer.subscribe(topics); // consumer.poll(0); <- I've removed this line, which shouldn't be necessary here. final Set<TopicPartition> partitions = new HashSet<>(); for (final String topic : topics) { for (final PartitionInfo partition : consumer.partitionsFor(topic)) { partitions.add(new TopicPartition(partition.topic(), partition.partition())); } } for (final TopicPartition tp : partitions) { final long offset = consumer.position(tp); committedOffsets.put(tp, offset); }{code} Here is the exception: {code:java} Exception in thread "main" java.lang.IllegalStateException: You can only check the position for partitions assigned to this consumer. at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1620) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1586) at org.apache.kafka.streams.tests.EosTestDriver.getCommittedOffsets(EosTestDriver.java:275) at org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:148) at org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:69){code} As you can see in the commented code in my snippet, we used to block for assignment with a poll(0), which is now deprecated. It seems reasonable to me for position() to do the same thing that poll() does, which is call `coordinator.poll(timeout.toMillis())` early in processing to ensure an up-to-date assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)