Dong Lin created KAFKA-6262:
-------------------------------

             Summary: Consumer should not uses metadata that is older than the 
existing metadata
                 Key: KAFKA-6262
                 URL: https://issues.apache.org/jira/browse/KAFKA-6262
             Project: Kafka
          Issue Type: Improvement
            Reporter: Dong Lin
            Assignee: Dong Lin


Currently the following sequence of events may happen that cause consumer to 
rewind back to the earliest offset even if there is no log truncation in Kafka. 
This can be a problem for MM by forcing MM to lag behind significantly and 
duplicate a large amount of data.

- Say there are three brokers 1,2,3 for a given partition P. Broker 1 is the 
leader. Initially they are all in ISR. HW and LEO are both 10.

- SRE does controlled shutdown for broker 1. Controller sends 
LeaderAndIsrRequest to all three brokers so that leader = broker 2 and isr_set 
= [broker 2, broker 3].

- Broker 2 and 3 receives and processes LeaderAndIsrRequest almost 
instantaneously. Now broker 2 and broker 3 can accept ProduceRequest and 
FetchRequest for the partition P. 
However, broker 1 has not processed this LeaderAndIsrRequest due to backlog in 
its request queue. So broker 1 still think it is leader for the partition P.

- Because there is leadership movement, a consumer receives 
NotLeaderForPartitionException, which triggers this consumer to send 
MetadataRequest to a randomly selected broker, say broker 2. Broker 2 tells 
consumer that itself is the leader for partition P. Consumer fetches date of 
partition P from broker 2. The latest data has offset 20.

- Later this consumer receives NotLeaderForPartitionException for another 
partition. It sends MetadataRequest to a randomly selected broker again. This 
time it sends MetadataRequest to broker 1, which tells the consumer that itself 
is the leader for partition P.

- This consumer issues FetchRequest for the partition P at offset 21. Broker 1 
returns OffsetOutOfRangeExeption because it thinks the LogEndOffset for this 
partition is 10.

There are two possible solutions for this problem. The long term solution is 
probably to include version in the MetadataResponse so that consumer knows 
whether the medata is outdated. This requires a KIP.

The short term solution, which should solve the problem in most cases, is to 
let consumer keep fetching metadata from the same (initially randomly picked) 
broker until the connection to this broker is disconnected. The metadata 
version will not go back in time if consumer keeps fetching metadata from the 
same broker.





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to