[ 
https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13857:
-----------------------------
    Description: 
The server side currently handles the LIST_OFFSETS request process as follows:
{code:java}
KafkaApis.handleListOffsetRequest() ->

KafkaApis.handleListOffsetRequestV1AndAbove() ->

ReplicaManager.fetchOffsetForTimestamp() ->

Partition.fetchOffsetForTimestamp(){code}
 

In the last method above, it is obvious that when the client side does not pass 
the isolationLevel value, the server side supports returning 
localLog.logEndOffset.
{code:java}
val lastFetchableOffset = isolationLevel match {
  case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
  case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
  case None => localLog.logEndOffset
} 

{code}
 

 

KafkaAdminClient is an operation and maintenance management tool, which *should 
be different from the listOffsets-related methods (offsetsForTimes, 
beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be 
limited by the value of {color:#ff0000}isolationLevel {color}in the 
ListOffsetsOptions parameter.*

In the current KafkaAdminClient.listOffsets() method, both the AdminClient and 
the server consider isolationLevel as a required parameter:
1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
will be thrown when AdminClient executes listOffsets() method.
{code:java}
ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}
2) The current logic for converting isolationLevel on the server side has not 
yet handled the case where the user passes in a value that is neither 
READ_UNCOMMITTED nor READ_COMMITTED :
{code:java}
val isolationLevelOpt = if (isClientRequest)
  Some(offsetRequest.isolationLevel)
else
  None {code}
{code:java}
public IsolationLevel isolationLevel() {
    return IsolationLevel.forId(data.isolationLevel());
} {code}
h1.  
h2. Suggestion:

Added a new enum `NONE` in IsolationLevel, only dedicated to 
AdminClient.listOffsets() method.
This change may cause the highestSupportedVersion of 
ApiMessageType.LIST_OFFSETS to increase by one.

 

 

  was:
The server side currently handles the LIST_OFFSETS request process as follows:
{code:java}
KafkaApis.handleListOffsetRequest() ->

KafkaApis.handleListOffsetRequestV1AndAbove() ->

ReplicaManager.fetchOffsetForTimestamp() ->

Partition.fetchOffsetForTimestamp(){code}
 

In the last method above, it is obvious that when the client side does not pass 
the isolationLevel value, the server side supports returning 
localLog.logEndOffset.
{code:java}
val lastFetchableOffset = isolationLevel match {
  case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
  case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
  case None => localLog.logEndOffset
} 

{code}
 

 

KafkaAdminClient is an operation and maintenance management tool, which *should 
be different from the listOffsets-related methods (offsetsForTimes, 
beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be 
limited by the value of {color:#ff0000}isolationLevel {color}in the 
ListOffsetsOptions parameter.*

In the current KafkaAdminClient.listOffsets() method, both the AdminClient and 
the server consider isolationLevel as a required parameter:
1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
will be thrown when AdminClient executes listOffsets() method.
{code:java}
ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}
2) The current logic for converting isolationLevel on the server side has not 
yet handled the case where the user passes in a value that is neither 
READ_UNCOMMITTED nor READ_COMMITTED :
{code:java}
val isolationLevelOpt = if (isClientRequest)
  Some(offsetRequest.isolationLevel)
else
  None {code}
{code:java}
public IsolationLevel isolationLevel() {
    return IsolationLevel.forId(data.isolationLevel());
} {code}
h1.  
h2. Solution:

Added a new enum `NONE` in IsolationLevel, dedicated to 
AdminClient.listOffsets() method.

 

 


> The listOffsets method of KafkaAdminClient should support returning 
> logEndOffset of topicPartition
> --------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13857
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13857
>             Project: Kafka
>          Issue Type: Improvement
>          Components: admin
>            Reporter: RivenSun
>            Priority: Major
>
> The server side currently handles the LIST_OFFSETS request process as follows:
> {code:java}
> KafkaApis.handleListOffsetRequest() ->
> KafkaApis.handleListOffsetRequestV1AndAbove() ->
> ReplicaManager.fetchOffsetForTimestamp() ->
> Partition.fetchOffsetForTimestamp(){code}
>  
> In the last method above, it is obvious that when the client side does not 
> pass the isolationLevel value, the server side supports returning 
> localLog.logEndOffset.
> {code:java}
> val lastFetchableOffset = isolationLevel match {
>   case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
>   case None => localLog.logEndOffset
> } 
> {code}
>  
>  
> KafkaAdminClient is an operation and maintenance management tool, which 
> *should be different from the listOffsets-related methods (offsetsForTimes, 
> beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not 
> be limited by the value of {color:#ff0000}isolationLevel {color}in the 
> ListOffsetsOptions parameter.*
> In the current KafkaAdminClient.listOffsets() method, both the AdminClient 
> and the server consider isolationLevel as a required parameter:
> 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
> will be thrown when AdminClient executes listOffsets() method.
> {code:java}
> ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}
> 2) The current logic for converting isolationLevel on the server side has not 
> yet handled the case where the user passes in a value that is neither 
> READ_UNCOMMITTED nor READ_COMMITTED :
> {code:java}
> val isolationLevelOpt = if (isClientRequest)
>   Some(offsetRequest.isolationLevel)
> else
>   None {code}
> {code:java}
> public IsolationLevel isolationLevel() {
>     return IsolationLevel.forId(data.isolationLevel());
> } {code}
> h1.  
> h2. Suggestion:
> Added a new enum `NONE` in IsolationLevel, only dedicated to 
> AdminClient.listOffsets() method.
> This change may cause the highestSupportedVersion of 
> ApiMessageType.LIST_OFFSETS to increase by one.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to