[ 
https://issues.apache.org/jira/browse/KAFKA-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15799561#comment-15799561
 ] 

Vahid Hashemian commented on KAFKA-4547:
----------------------------------------

So here's what I've found so far.

The inconsistency in behavior seems to have started by [this 
PR|https://github.com/apache/kafka/pull/1720]. Due to refactoring of the method 
{{KafkaConsumer.updateFetchPositions}} the outcome of this scenario has changed.

To be more specific, I started with a fresh Kafka cluster, and ran these steps:

# Started ZooKeeper and a Kafka broker
# Created a topic {{foo}} with 3 partitions.
# Ran the consumer code provided to check offsets. They are 0,0,0.
# Ran the producer code to produce messages.
# Ran the consumer code again. The offsets are now 0,0,2.

If we consider what's happening in step 5, the consumer code checks for offset 
position of partitions one by one.

------------------------
So it starts with {{foo-0}}:

This partition has no valid offset position. Therefore, a call is made to 
[{{updateFetchPositions(Collections.singleton(partition));}}|https://github.com/apache/kafka/blob/be36b322749003581474e2c84a3ec9ba2aaec53c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1221].

In that 
[method|https://github.com/apache/kafka/blob/be36b322749003581474e2c84a3ec9ba2aaec53c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1428]:

* {{fetcher.resetOffsetsIfNeeded(partitions);}} resets the offset of {{foo-0}} 
and seeks to the latest offset (which is 2)

* {{subscriptions.hasAllFetchPositions()}} is false, because there are two 
other partitions that still waiting for offset reset, so the if block runs

** {{coordinator.refreshCommittedOffsetsIfNeeded();}} updates committed offsets 
of all partitions to {{foo-0}}: 0, {{foo-1}}: 0, {{foo-2}}: 0

** {{fetcher.updateFetchPositions(partitions);}} updates fetch position of 
{{foo-0}} only based on its committed offset, which is 0. So it seeks to offset 
0 of {{foo-0}} (which will be the reported offset for that partition)

------------------------
Then it's {{foo-1}}'s turn which has no offset position yet:

Again, a call to {{updateFetchPositions(Collections.singleton(partition));}} is 
made.

* {{fetcher.resetOffsetsIfNeeded(partitions);}} similar to above, resets the 
offset of {{foo-1}} and seeks to the latest offset (which is 2)

* {{subscriptions.hasAllFetchPositions()}} is still false (since {{foo-2}} 
still has no offset position)

** {{coordinator.refreshCommittedOffsetsIfNeeded();}} does nothing, since 
committed offsets are already set above

** {{fetcher.updateFetchPositions(partitions);}} updates fetch position for 
{{foo-1}} based on its committed offset, which is 0. so it seeks to offset 0 of 
{{foo-1}} (which will be the reported offset for that partition)

------------------------
Finally it's {{foo-2}}'s turn which has no offset position yet:

Another call to {{updateFetchPositions(Collections.singleton(partition));}} is 
made.

* {{fetcher.resetOffsetsIfNeeded(partitions);}} similar to above, resets the 
offset of {{foo-2}} and seeks to the latest offset (which is 2)

* {{subscriptions.hasAllFetchPositions()}} is now true since all offsets have 
valid positions now. So the if block does not run and 2 will be the reported 
offset for foo-2.

------------------------

The significance pausing partitions to reproduce the issue is that the seek in 
the last call ({{fetcher.updateFetchPositions(partitions);}}) for partitions 
{{foo-0}} and {{foo-1}} would not occur if those partitions are un-paused 
(resumed) (due to [this {{if}} 
block|https://github.com/apache/kafka/blob/be36b322749003581474e2c84a3ec9ba2aaec53c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L177]).

A potential work around would be to modify the {{if}} block condition as in 
[here|https://github.com/apache/kafka/compare/trunk...vahidhashemian:KAFKA-4547?expand=1]
 to avoid unnecessary commits and seeks.
It would be great if [~hachikuji] could weigh in here too.

> Consumer.position returns incorrect results for Kafka 0.10.1.0 client
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-4547
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4547
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.10.1.0
>         Environment: Windows Kafka 0.10.1.0
>            Reporter: Pranav Nakhe
>            Assignee: Vahid Hashemian
>              Labels: clients
>             Fix For: 0.10.2.0
>
>         Attachments: issuerep.zip
>
>
> Consider the following code -
>               KafkaConsumer<String, String> consumer = new 
> KafkaConsumer<String, String>(props);
>               List<TopicPartition> listOfPartitions = new ArrayList();
>               for (int i = 0; i < 
> consumer.partitionsFor("IssueTopic").size(); i++) {
>                       listOfPartitions.add(new TopicPartition("IssueTopic", 
> i));
>               }
>               consumer.assign(listOfPartitions);              
>               consumer.pause(listOfPartitions);
>               consumer.seekToEnd(listOfPartitions);
> //            consumer.resume(listOfPartitions); -- commented out
>               for(int i = 0; i < listOfPartitions.size(); i++) {
>                       
> System.out.println(consumer.position(listOfPartitions.get(i)));
>               }
>               
> I have created a topic IssueTopic with 3 partitions with a single replica on 
> my single node kafka installation (0.10.1.0)
> The behavior noticed for Kafka client 0.10.1.0 as against Kafka client 
> 0.10.0.1
> A) Initially when there are no messages on IssueTopic running the above 
> program returns
> 0.10.1.0                   
> 0                              
> 0                              
> 0           
> 0.10.0.1
> 0
> 0
> 0
> B) Next I send 6 messages and see that the messages have been evenly 
> distributed across the three partitions. Running the above program now 
> returns 
> 0.10.1.0                   
> 0                              
> 0                              
> 2                              
> 0.10.0.1
> 2
> 2
> 2
> Clearly there is a difference in behavior for the 2 clients.
> Now after seekToEnd call if I make a call to resume (uncomment the resume 
> call in code above) then the behavior is
> 0.10.1.0                   
> 2                              
> 2                              
> 2                              
> 0.10.0.1
> 2
> 2
> 2
> This is an issue I came across when using the spark kafka integration for 
> 0.10. When I use kafka 0.10.1.0 I started seeing this issue. I had raised a 
> pull request to resolve that issue [SPARK-18779] but when looking at the 
> kafka client implementation/documentation now it seems the issue is with 
> kafka and not with spark. There does not seem to be any documentation which 
> specifies/implies that we need to call resume after seekToEnd for position to 
> return the correct value. Also there is a clear difference in the behavior in 
> the two kafka client implementations. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to