[ https://issues.apache.org/jira/browse/KAFKA-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15799561#comment-15799561 ]
Vahid Hashemian commented on KAFKA-4547: ---------------------------------------- So here's what I've found so far. The inconsistency in behavior seems to have started by [this PR|https://github.com/apache/kafka/pull/1720]. Due to refactoring of the method {{KafkaConsumer.updateFetchPositions}} the outcome of this scenario has changed. To be more specific, I started with a fresh Kafka cluster, and ran these steps: # Started ZooKeeper and a Kafka broker # Created a topic {{foo}} with 3 partitions. # Ran the consumer code provided to check offsets. They are 0,0,0. # Ran the producer code to produce messages. # Ran the consumer code again. The offsets are now 0,0,2. If we consider what's happening in step 5, the consumer code checks for offset position of partitions one by one. ------------------------ So it starts with {{foo-0}}: This partition has no valid offset position. Therefore, a call is made to [{{updateFetchPositions(Collections.singleton(partition));}}|https://github.com/apache/kafka/blob/be36b322749003581474e2c84a3ec9ba2aaec53c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1221]. In that [method|https://github.com/apache/kafka/blob/be36b322749003581474e2c84a3ec9ba2aaec53c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1428]: * {{fetcher.resetOffsetsIfNeeded(partitions);}} resets the offset of {{foo-0}} and seeks to the latest offset (which is 2) * {{subscriptions.hasAllFetchPositions()}} is false, because there are two other partitions that still waiting for offset reset, so the if block runs ** {{coordinator.refreshCommittedOffsetsIfNeeded();}} updates committed offsets of all partitions to {{foo-0}}: 0, {{foo-1}}: 0, {{foo-2}}: 0 ** {{fetcher.updateFetchPositions(partitions);}} updates fetch position of {{foo-0}} only based on its committed offset, which is 0. So it seeks to offset 0 of {{foo-0}} (which will be the reported offset for that partition) ------------------------ Then it's {{foo-1}}'s turn which has no offset position yet: Again, a call to {{updateFetchPositions(Collections.singleton(partition));}} is made. * {{fetcher.resetOffsetsIfNeeded(partitions);}} similar to above, resets the offset of {{foo-1}} and seeks to the latest offset (which is 2) * {{subscriptions.hasAllFetchPositions()}} is still false (since {{foo-2}} still has no offset position) ** {{coordinator.refreshCommittedOffsetsIfNeeded();}} does nothing, since committed offsets are already set above ** {{fetcher.updateFetchPositions(partitions);}} updates fetch position for {{foo-1}} based on its committed offset, which is 0. so it seeks to offset 0 of {{foo-1}} (which will be the reported offset for that partition) ------------------------ Finally it's {{foo-2}}'s turn which has no offset position yet: Another call to {{updateFetchPositions(Collections.singleton(partition));}} is made. * {{fetcher.resetOffsetsIfNeeded(partitions);}} similar to above, resets the offset of {{foo-2}} and seeks to the latest offset (which is 2) * {{subscriptions.hasAllFetchPositions()}} is now true since all offsets have valid positions now. So the if block does not run and 2 will be the reported offset for foo-2. ------------------------ The significance pausing partitions to reproduce the issue is that the seek in the last call ({{fetcher.updateFetchPositions(partitions);}}) for partitions {{foo-0}} and {{foo-1}} would not occur if those partitions are un-paused (resumed) (due to [this {{if}} block|https://github.com/apache/kafka/blob/be36b322749003581474e2c84a3ec9ba2aaec53c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L177]). A potential work around would be to modify the {{if}} block condition as in [here|https://github.com/apache/kafka/compare/trunk...vahidhashemian:KAFKA-4547?expand=1] to avoid unnecessary commits and seeks. It would be great if [~hachikuji] could weigh in here too. > Consumer.position returns incorrect results for Kafka 0.10.1.0 client > --------------------------------------------------------------------- > > Key: KAFKA-4547 > URL: https://issues.apache.org/jira/browse/KAFKA-4547 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.10.1.0 > Environment: Windows Kafka 0.10.1.0 > Reporter: Pranav Nakhe > Assignee: Vahid Hashemian > Labels: clients > Fix For: 0.10.2.0 > > Attachments: issuerep.zip > > > Consider the following code - > KafkaConsumer<String, String> consumer = new > KafkaConsumer<String, String>(props); > List<TopicPartition> listOfPartitions = new ArrayList(); > for (int i = 0; i < > consumer.partitionsFor("IssueTopic").size(); i++) { > listOfPartitions.add(new TopicPartition("IssueTopic", > i)); > } > consumer.assign(listOfPartitions); > consumer.pause(listOfPartitions); > consumer.seekToEnd(listOfPartitions); > // consumer.resume(listOfPartitions); -- commented out > for(int i = 0; i < listOfPartitions.size(); i++) { > > System.out.println(consumer.position(listOfPartitions.get(i))); > } > > I have created a topic IssueTopic with 3 partitions with a single replica on > my single node kafka installation (0.10.1.0) > The behavior noticed for Kafka client 0.10.1.0 as against Kafka client > 0.10.0.1 > A) Initially when there are no messages on IssueTopic running the above > program returns > 0.10.1.0 > 0 > 0 > 0 > 0.10.0.1 > 0 > 0 > 0 > B) Next I send 6 messages and see that the messages have been evenly > distributed across the three partitions. Running the above program now > returns > 0.10.1.0 > 0 > 0 > 2 > 0.10.0.1 > 2 > 2 > 2 > Clearly there is a difference in behavior for the 2 clients. > Now after seekToEnd call if I make a call to resume (uncomment the resume > call in code above) then the behavior is > 0.10.1.0 > 2 > 2 > 2 > 0.10.0.1 > 2 > 2 > 2 > This is an issue I came across when using the spark kafka integration for > 0.10. When I use kafka 0.10.1.0 I started seeing this issue. I had raised a > pull request to resolve that issue [SPARK-18779] but when looking at the > kafka client implementation/documentation now it seems the issue is with > kafka and not with spark. There does not seem to be any documentation which > specifies/implies that we need to call resume after seekToEnd for position to > return the correct value. Also there is a clear difference in the behavior in > the two kafka client implementations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)