[ https://issues.apache.org/jira/browse/KAFKA-7000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lucas Brutschy reassigned KAFKA-7000: ------------------------------------- Assignee: Lucas Brutschy (was: Lucas Brutschy) > KafkaConsumer.position should wait for assignment metadata > ---------------------------------------------------------- > > Key: KAFKA-7000 > URL: https://issues.apache.org/jira/browse/KAFKA-7000 > Project: Kafka > Issue Type: Improvement > Components: clients > Reporter: John Roesler > Assignee: Lucas Brutschy > Priority: Blocker > Fix For: 2.0.0 > > > While updating Kafka Streams to stop using the deprecated > Consumer.poll(long), I found that this code unexpectedly throws an exception: > {code:java} > consumer.subscribe(topics); > // consumer.poll(0); <- I've removed this line, which shouldn't be necessary > here. > final Set<TopicPartition> partitions = new HashSet<>(); > for (final String topic : topics) { > for (final PartitionInfo partition : consumer.partitionsFor(topic)) { > partitions.add(new TopicPartition(partition.topic(), > partition.partition())); > } > } > for (final TopicPartition tp : partitions) { > final long offset = consumer.position(tp); > committedOffsets.put(tp, offset); > }{code} > Here is the exception: > {code:java} > Exception in thread "main" java.lang.IllegalStateException: You can only > check the position for partitions assigned to this consumer. > at > org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1620) > at > org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1586) > at > org.apache.kafka.streams.tests.EosTestDriver.getCommittedOffsets(EosTestDriver.java:275) > at > org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:148) > at > org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:69){code} > > As you can see in the commented code in my snippet, we used to block for > assignment with a poll(0), which is now deprecated. > It seems reasonable to me for position() to do the same thing that poll() > does, which is call `coordinator.poll(timeout.toMillis())` early in > processing to ensure an up-to-date assignment. -- This message was sent by Atlassian Jira (v8.20.10#820010)