[ 
https://issues.apache.org/jira/browse/KAFKA-7000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505305#comment-16505305
 ] 

ASF GitHub Bot commented on KAFKA-7000:
---------------------------------------

vvcephei closed pull request #5142: KAFKA-7000: update assignment in 
Consumer#position
URL: https://github.com/apache/kafka/pull/5142
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 5bd6b935b39..e109f88f2a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1611,6 +1611,7 @@ public long position(TopicPartition partition, final 
Duration timeout) {
         final long timeoutMs = timeout.toMillis();
         acquireAndEnsureOpen();
         try {
+            coordinator.poll(timeout.toMillis());
             if (!this.subscriptions.isAssigned(partition))
                 throw new IllegalStateException("You can only check the 
position for partitions assigned to this consumer.");
             Long offset = this.subscriptions.position(partition);
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 372cc3ffed6..55234b17e59 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -181,7 +181,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumers += consumer0
 
     var commitCompleted = false
-    var committedPosition: Long = -1
 
     val listener = new TestConsumerReassignmentListener {
       override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {
@@ -190,8 +189,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
           // than session timeout and then try a commit. We should still be in 
the group,
           // so the commit should succeed
           Utils.sleep(1500)
-          committedPosition = consumer0.position(tp)
-          consumer0.commitSync(Map(tp -> new 
OffsetAndMetadata(committedPosition)).asJava)
+          consumer0.commitSync(Map(tp -> new OffsetAndMetadata(0)).asJava)
           commitCompleted = true
         }
         super.onPartitionsRevoked(partitions)
@@ -207,7 +205,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer0.subscribe(List("otherTopic").asJava, listener)
     consumer0.poll(0)
 
-    assertEquals(0, committedPosition)
     assertTrue(commitCompleted)
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConsumer.position should wait for assignment metadata
> ----------------------------------------------------------
>
>                 Key: KAFKA-7000
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7000
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Blocker
>             Fix For: 2.0.0
>
>
> While updating Kafka Streams to stop using the deprecated 
> Consumer.poll(long), I found that this code unexpectedly throws an exception:
> {code:java}
> consumer.subscribe(topics);
> // consumer.poll(0); <- I've removed this line, which shouldn't be necessary 
> here.
> final Set<TopicPartition> partitions = new HashSet<>();
> for (final String topic : topics) {
>     for (final PartitionInfo partition : consumer.partitionsFor(topic)) {
>         partitions.add(new TopicPartition(partition.topic(), 
> partition.partition()));
>     }
> }
> for (final TopicPartition tp : partitions) {
>     final long offset = consumer.position(tp);
>     committedOffsets.put(tp, offset);
> }{code}
> Here is the exception:
> {code:java}
> Exception in thread "main" java.lang.IllegalStateException: You can only 
> check the position for partitions assigned to this consumer.
>    at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1620)
>    at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1586)
>    at 
> org.apache.kafka.streams.tests.EosTestDriver.getCommittedOffsets(EosTestDriver.java:275)
>    at 
> org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:148)
>    at 
> org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:69){code}
>  
> As you can see in the commented code in my snippet, we used to block for 
> assignment with a poll(0), which is now deprecated.
> It seems reasonable to me for position() to do the same thing that poll() 
> does, which is call `coordinator.poll(timeout.toMillis())` early in 
> processing to ensure an up-to-date assignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to