[ 
https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528170#comment-17528170
 ] 

RivenSun commented on KAFKA-13857:
----------------------------------

Hi [~guozhang]  [~showuon]
Could you give some suggestions for this issue?
Thanks.

> The listOffsets method of KafkaAdminClient should support returning 
> logEndOffset of topicPartition
> --------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13857
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13857
>             Project: Kafka
>          Issue Type: Improvement
>          Components: admin
>            Reporter: RivenSun
>            Priority: Major
>
> The server side currently handles the LIST_OFFSETS request process as follows:
> {code:java}
> KafkaApis.handleListOffsetRequest() ->
> KafkaApis.handleListOffsetRequestV1AndAbove() ->
> ReplicaManager.fetchOffsetForTimestamp() ->
> Partition.fetchOffsetForTimestamp(){code}
>  
> In the last method above, it is obvious that when the client side does not 
> pass the isolationLevel value, the server side supports returning 
> localLog.logEndOffset.
> {code:java}
> val lastFetchableOffset = isolationLevel match {
>   case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
>   case None => localLog.logEndOffset
> } 
> {code}
>  
>  
> KafkaAdminClient is an operation and maintenance management tool, which 
> *should be different from the listOffsets-related methods (offsetsForTimes, 
> beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not 
> be limited by the value of {color:#ff0000}isolationLevel {color}in the 
> ListOffsetsOptions parameter.*
> In the current KafkaAdminClient.listOffsets() method, both the AdminClient 
> and the server consider isolationLevel as a required parameter:
> 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
> will be thrown when AdminClient executes listOffsets() method.
> {code:java}
> ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}
> 2) The current logic for converting isolationLevel on the server side has not 
> yet handled the case where the user passes in a value that is neither 
> READ_UNCOMMITTED nor READ_COMMITTED :
> {code:java}
> val isolationLevelOpt = if (isClientRequest)
>   Some(offsetRequest.isolationLevel)
> else
>   None {code}
> {code:java}
> public IsolationLevel isolationLevel() {
>     return IsolationLevel.forId(data.isolationLevel());
> } {code}
> h1.  
> h2. Solution:
> Added a new enum `NONE` in IsolationLevel, dedicated to 
> AdminClient.listOffsets() method.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to