[ 
https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17529143#comment-17529143
 ] 

RivenSun commented on KAFKA-13857:
----------------------------------

Hi [~guozhang]  Thank you for your reply.

As you said, in order to ensure data consistency between the leader replica and 
follower replicas, the leaderEpoch mechanism may truncate log files.

Other than that: if the rate of synchronization messages of all ISR replicas is 
close to the leader's write rate, the LEO and HW of the leader replica are 
almost the same value.

At present, I do not have a strong need to return LEO, but when reading the 
source code, I found that there is a code trace on the server side that 
supports returning LEO.
Due to the different roles of AdminClient and KafkaConsumer, I think 
AdminClient may can support returning LEO.

Thanks.

> The listOffsets method of KafkaAdminClient should support returning 
> logEndOffset of topicPartition
> --------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13857
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13857
>             Project: Kafka
>          Issue Type: Improvement
>          Components: admin
>            Reporter: RivenSun
>            Priority: Major
>
> The server side currently handles the LIST_OFFSETS request process as follows:
> {code:java}
> KafkaApis.handleListOffsetRequest() ->
> KafkaApis.handleListOffsetRequestV1AndAbove() ->
> ReplicaManager.fetchOffsetForTimestamp() ->
> Partition.fetchOffsetForTimestamp(){code}
>  
> In the last method above, it is obvious that when the client side does not 
> pass the isolationLevel value, the server side supports returning 
> localLog.logEndOffset.
> {code:java}
> val lastFetchableOffset = isolationLevel match {
>   case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
>   case None => localLog.logEndOffset
> } 
> {code}
>  
>  
> KafkaAdminClient is an operation and maintenance management tool, which 
> *should be different from the listOffsets-related methods (offsetsForTimes, 
> beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not 
> be limited by the value of {color:#ff0000}isolationLevel {color}in the 
> ListOffsetsOptions parameter.*
> In the current KafkaAdminClient.listOffsets() method, both the AdminClient 
> and the server consider isolationLevel as a required parameter:
> 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
> will be thrown when AdminClient executes listOffsets() method.
> {code:java}
> ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}
> 2) The current logic for converting isolationLevel on the server side has not 
> yet handled the case where the user passes in a value that is neither 
> READ_UNCOMMITTED nor READ_COMMITTED :
> {code:java}
> val isolationLevelOpt = if (isClientRequest)
>   Some(offsetRequest.isolationLevel)
> else
>   None {code}
> {code:java}
> public IsolationLevel isolationLevel() {
>     return IsolationLevel.forId(data.isolationLevel());
> } {code}
> h1.  
> h2. Suggestion:
> Added a new enum `NONE` in IsolationLevel, only dedicated to 
> AdminClient.listOffsets() method.
> This change may cause the highestSupportedVersion of 
> ApiMessageType.LIST_OFFSETS to increase by one.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to