RivenSun created KAFKA-13857:
--------------------------------

             Summary: The listOffsets method of KafkaAdminClient should support 
returning logEndOffset of topicPartition
                 Key: KAFKA-13857
                 URL: https://issues.apache.org/jira/browse/KAFKA-13857
             Project: Kafka
          Issue Type: Bug
          Components: admin
            Reporter: RivenSun


The server side currently handles the LIST_OFFSETS request process as follows:
{code:java}
KafkaApis.handleListOffsetRequest() ->

KafkaApis.handleListOffsetRequestV1AndAbove() ->

ReplicaManager.fetchOffsetForTimestamp() ->

Partition.fetchOffsetForTimestamp(){code}
 

In the last method above, it is obvious that when the client side does not pass 
the isolationLevel value, the server side supports returning 
localLog.logEndOffset.
{code:java}
val lastFetchableOffset = isolationLevel match {
  case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
  case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
  case None => localLog.logEndOffset
} 

{code}
 

 

KafkaAdminClient is an operation and maintenance management tool, which *should 
be different from the listOffsets-related methods (offsetsForTimes, 
beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be 
limited by the value of {color:#FF0000}isolationLevel {color}in the 
ListOffsetsOptions parameter.*



In the current KafkaAdminClient.listOffsets() method, both the AdminClient and 
the server consider isolationLevel as a required parameter:
1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
will be thrown when AdminClient executes listOffsets() method.
{code:java}
ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}

2) The current logic for converting isolationLevel on the server side has not 
yet handled the case where the user passes in a value that is neither 
READ_UNCOMMITTED nor READ_COMMITTED :
{code:java}
val isolationLevelOpt = if (isClientRequest)
  Some(offsetRequest.isolationLevel)
else
  None {code}
{code:java}
public IsolationLevel isolationLevel() {
    return IsolationLevel.forId(data.isolationLevel());
} {code}
h1. 

Solution:

Added a new enum `NONE` in IsolationLevel, dedicated to 
AdminClient.listOffsets() method.

 

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to