[ 
https://issues.apache.org/jira/browse/KAFKA-46?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13280225#comment-13280225
 ] 

Jun Rao commented on KAFKA-46:
------------------------------

Thanks for the patch. Overall, a very encouraging patch given the complexity of 
this jira. Some comments:

>From previous reviews:
Were 4.1 and 4.2 addressed in the patch? I still see CUR and reassignedReplicas.
For 4.4, I think highWatermarkUpdateTime can be used as described in 15.2 below.
For 6, I meant that all local variable names should also be prefixed with 
follower.

New review comments:
11. KafkaApis:
11.1 handleFetchRequest(): if the leader of one partition is not on this 
broker, we reject the whole request. Ideally, we should just send the error 
code for that partition in the response and fulfill the rest of the request. 
11.2 handleFetchRequest() and readMessageSets(): If the leader is not on this 
broker, we should probably return a new type of error like NotLeaderException, 
instead of using InvalidPartionException or throwing IllegalStateException. 
11.3 readMessageSets(): add a comment of what -1 means for replicaId

12. ReplicaManager:
12.1 remove unused imports
12.2 maybeIncrementLeaderHW(): if(newHw < oldHw) should be if(newHw > oldHw)
12.3 We will need to either synchronize the methods in this class or use 
ConcurrentHashMap for allReplicas since allReplicas can be read and updated 
concurrently.

13. Replica:
13.1 hw(): to be consistent, we should probably throw IllegalStateException, 
instead of InvalidPartitionException.

14. KafkaServer:
14.1 There are a couple of TODOs. Will they be addressed in this jira or 
separate jiras?

15. ISRExpirationThread:
15.1 It seems that when the time expires, we always update the ISR. We should 
only update ISR if it actually shrinks.
15.2 Currently, we take a replica out of ISR if its LEO is less than leaderHW 
after keepInSync time. We probably should use the following condition:
  leaderHW - r.leo > keepInSyncBytes || currentTime - r.highWatermarkUpdateTime 
> keepInSyncTime
  The first condition handles a slow follower and the second condition handles 
a stuck follower.
15.3 I think we can potentially get rid of the inner while loop by putting all 
the logic when time expires in a if statement and the awaitUtil part in the 
else clause of the if statement.
15.4 Also, instead of using a priority queue and keep adding and deleting 
partitions into the queue, would it be simpler to have the thread just check 
the isInsyncCondition for each partition every keepInSyncTime?

16. LogDisk: recoverUptoLastCheckpointedHW(): 
16.1 The second condition in
          segments.view.find(segment =>  lastKnownHW >= segment.start && 
lastKnownHW < segment.size) 
       seems incorrect. It seems that you want to use "lastKnownHW < 
segment.messageSet.getEndOffset"
16.2 The files of all deleted segments should be deleted like that in 
LogManager.cleanupExpiredSegments().

17. LogOffsetTest:
17.1 There is no need to keep testEmptyLogs(), since we have a test that covers 
fetching from a non-existing topic using SimpleConsumer.

18. PrimitiveApiTest:
18.1 testConsumerNotExistTopic(): we probably shouldn't create the topic in 
this case.

19. ProducerTest:
19.1 testZKSendToNewTopic(): Which should fix the comment that says "Available 
partition ids should be 0, 1, 2 and 3" since there is only 1 partition created.

20. ReplicaFetchTest:
20.1 Since the test is already using in-memory log, we can remove TODO in 
testReplicaFetcherThread().
20.2 testReplicaFetcherThreadI(): Instead of sleeping and then checking 
log.get.getLogEndOffset, could we create a utility method that keeps checking 
until LEO reaches certain value up to a certain max wait time? Maybe we should 
make a more general util that waits up to a certain amount of time until a 
condition is satisfied. 

                
> Commit thread, ReplicaFetcherThread for intra-cluster replication
> -----------------------------------------------------------------
>
>                 Key: KAFKA-46
>                 URL: https://issues.apache.org/jira/browse/KAFKA-46
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jun Rao
>            Assignee: Neha Narkhede
>         Attachments: kafka-46-draft.patch, kafka-46-v1.patch
>
>
> We need to implement the commit thread at the leader and the fetcher thread 
> at the follower for replication the data from the leader.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to