[ 
https://issues.apache.org/jira/browse/KAFKA-1430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13985159#comment-13985159
 ] 

Jun Rao commented on KAFKA-1430:
--------------------------------

The following is a proposed redesign of Purgatory.

1. In RequestPurgatory, we replace 
       watch(delayedRequest: T)
    with 
       checkAndMaybeWatch(delayedRequest: T,  request: R): Boolean
checkAndMaybeWatch() will use request to check if delayedRequest can be 
satisfied immediately. If so, it will return true and will not add 
delayedRequest to the watcher. Otherwise, it will return false and add 
delayedRequest to the watcher. The check and add will be done atomically inside 
the synchronization point in Watchers.add.

2. Log
2.1 Change Log.append() so that it additionally returns the end file position 
of the last appended message.
2.2 Change Log.read() so that it additionally returns the file position for the 
offset used in the fetch request.

3. Partition/Replica
3.1 We maintain the logEndPosition in each replica.
3.2 In appendMessage(), we additionally pass in the end file position returned 
from Log.append() and save it in the leader replica.
3.3. We pass in the file position returned from Log.read() from 
KafkaApis.handleFetchRequest() all the way to 
Partition.updateLeaderHWAndMaybeExpandIsr() and save it in the follower replica.

4. To unblock a pending regular consumer fetch request:
4.1 When creating a delayed fetch request, we pass in its fetchOfffsetPosition.
4.2 In Partition.maybeIncrementLeaderHW(), maintain a highWatermarkPosition. 
Every time the high watermark moves, we move highWatermarkPosition as well. We 
then call FetchPurgatory.update() to unblock regular consumer fetch requests. 
The check can now be done by just comparing the difference btw 
highWatermarkPosition and fetchOfffsetPosition.

5. To unblock a pending follower fetch request:
5.1 When creating a delayed fetch request, we pass in its fetchOfffsetPosition.
5.2 After each local log append in the leader, we call FetchPurgatory.update() 
to unblock follower consumer fetch requests, by passing in the end file 
position returned from Log.append(). The check can now be done by just 
comparing the difference btw end file position and fetchOfffsetPosition.

We need to handle fetch requests not on the last log segment. One solution is 
to maintain file position as the accumulated byte position since the first 
segment.  We can store a startPosition value in each log segment. Note that 
value just needs to be maintained in memory. We can initialize them again after 
broker restart.

> Purgatory redesign
> ------------------
>
>                 Key: KAFKA-1430
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1430
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>    Affects Versions: 0.8.2
>            Reporter: Jun Rao
>
> We have seen 2 main issues with the Purgatory.
> 1. There is no atomic checkAndWatch functionality. So, a client typically 
> first checks whether a request is satisfied or not and then register the 
> watcher. However, by the time the watcher is registered, the registered item 
> could already be satisfied. This item won't be satisfied until the next 
> update happens or the delayed time expires, which means the watched item 
> could be delayed. 
> 2. FetchRequestPurgatory doesn't quite work. This is because the current 
> design tries to incrementally maintain the accumulated bytes ready for fetch. 
> However, this is difficult since the right time to check whether a fetch (for 
> regular consumer) request is satisfied is when the high watermark moves. At 
> that point, it's hard to figure out how many bytes we should incrementally 
> add to each pending fetch request.
> The problem has been reported in KAFAK-1150 and KAFKA-703.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to