[ 
https://issues.apache.org/jira/browse/KAFKA-7000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-7000:
-------------------------------------

    Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> KafkaConsumer.position should wait for assignment metadata
> ----------------------------------------------------------
>
>                 Key: KAFKA-7000
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7000
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: John Roesler
>            Assignee: Lucas Brutschy
>            Priority: Blocker
>             Fix For: 2.0.0
>
>
> While updating Kafka Streams to stop using the deprecated 
> Consumer.poll(long), I found that this code unexpectedly throws an exception:
> {code:java}
> consumer.subscribe(topics);
> // consumer.poll(0); <- I've removed this line, which shouldn't be necessary 
> here.
> final Set<TopicPartition> partitions = new HashSet<>();
> for (final String topic : topics) {
>     for (final PartitionInfo partition : consumer.partitionsFor(topic)) {
>         partitions.add(new TopicPartition(partition.topic(), 
> partition.partition()));
>     }
> }
> for (final TopicPartition tp : partitions) {
>     final long offset = consumer.position(tp);
>     committedOffsets.put(tp, offset);
> }{code}
> Here is the exception:
> {code:java}
> Exception in thread "main" java.lang.IllegalStateException: You can only 
> check the position for partitions assigned to this consumer.
>    at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1620)
>    at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1586)
>    at 
> org.apache.kafka.streams.tests.EosTestDriver.getCommittedOffsets(EosTestDriver.java:275)
>    at 
> org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:148)
>    at 
> org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:69){code}
>  
> As you can see in the commented code in my snippet, we used to block for 
> assignment with a poll(0), which is now deprecated.
> It seems reasonable to me for position() to do the same thing that poll() 
> does, which is call `coordinator.poll(timeout.toMillis())` early in 
> processing to ensure an up-to-date assignment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to