[ https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
RivenSun updated KAFKA-13857: ----------------------------- Description: The server side currently handles the LIST_OFFSETS request process as follows: {code:java} KafkaApis.handleListOffsetRequest() -> KafkaApis.handleListOffsetRequestV1AndAbove() -> ReplicaManager.fetchOffsetForTimestamp() -> Partition.fetchOffsetForTimestamp(){code} In the last method above, it is obvious that when the client side does not pass the isolationLevel value, the server side supports returning localLog.logEndOffset. {code:java} val lastFetchableOffset = isolationLevel match { case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark case None => localLog.logEndOffset } {code} KafkaAdminClient is an operation and maintenance management tool, which *should be different from the listOffsets-related methods (offsetsForTimes, beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be limited by the value of {color:#ff0000}isolationLevel {color}in the ListOffsetsOptions parameter.* In the current KafkaAdminClient.listOffsets() method, both the AdminClient and the server consider isolationLevel as a required parameter: 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException will be thrown when AdminClient executes listOffsets() method. {code:java} ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code} 2) The current logic for converting isolationLevel on the server side has not yet handled the case where the user passes in a value that is neither READ_UNCOMMITTED nor READ_COMMITTED : {code:java} val isolationLevelOpt = if (isClientRequest) Some(offsetRequest.isolationLevel) else None {code} {code:java} public IsolationLevel isolationLevel() { return IsolationLevel.forId(data.isolationLevel()); } {code} h1. h2. Solution: Added a new enum `NONE` in IsolationLevel, dedicated to AdminClient.listOffsets() method. was: The server side currently handles the LIST_OFFSETS request process as follows: {code:java} KafkaApis.handleListOffsetRequest() -> KafkaApis.handleListOffsetRequestV1AndAbove() -> ReplicaManager.fetchOffsetForTimestamp() -> Partition.fetchOffsetForTimestamp(){code} In the last method above, it is obvious that when the client side does not pass the isolationLevel value, the server side supports returning localLog.logEndOffset. {code:java} val lastFetchableOffset = isolationLevel match { case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark case None => localLog.logEndOffset } {code} KafkaAdminClient is an operation and maintenance management tool, which *should be different from the listOffsets-related methods (offsetsForTimes, beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be limited by the value of {color:#FF0000}isolationLevel {color}in the ListOffsetsOptions parameter.* In the current KafkaAdminClient.listOffsets() method, both the AdminClient and the server consider isolationLevel as a required parameter: 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException will be thrown when AdminClient executes listOffsets() method. {code:java} ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code} 2) The current logic for converting isolationLevel on the server side has not yet handled the case where the user passes in a value that is neither READ_UNCOMMITTED nor READ_COMMITTED : {code:java} val isolationLevelOpt = if (isClientRequest) Some(offsetRequest.isolationLevel) else None {code} {code:java} public IsolationLevel isolationLevel() { return IsolationLevel.forId(data.isolationLevel()); } {code} h1. Solution: Added a new enum `NONE` in IsolationLevel, dedicated to AdminClient.listOffsets() method. Issue Type: Improvement (was: Bug) > The listOffsets method of KafkaAdminClient should support returning > logEndOffset of topicPartition > -------------------------------------------------------------------------------------------------- > > Key: KAFKA-13857 > URL: https://issues.apache.org/jira/browse/KAFKA-13857 > Project: Kafka > Issue Type: Improvement > Components: admin > Reporter: RivenSun > Priority: Major > > The server side currently handles the LIST_OFFSETS request process as follows: > {code:java} > KafkaApis.handleListOffsetRequest() -> > KafkaApis.handleListOffsetRequestV1AndAbove() -> > ReplicaManager.fetchOffsetForTimestamp() -> > Partition.fetchOffsetForTimestamp(){code} > > In the last method above, it is obvious that when the client side does not > pass the isolationLevel value, the server side supports returning > localLog.logEndOffset. > {code:java} > val lastFetchableOffset = isolationLevel match { > case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset > case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark > case None => localLog.logEndOffset > } > {code} > > > KafkaAdminClient is an operation and maintenance management tool, which > *should be different from the listOffsets-related methods (offsetsForTimes, > beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not > be limited by the value of {color:#ff0000}isolationLevel {color}in the > ListOffsetsOptions parameter.* > In the current KafkaAdminClient.listOffsets() method, both the AdminClient > and the server consider isolationLevel as a required parameter: > 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException > will be thrown when AdminClient executes listOffsets() method. > {code:java} > ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code} > 2) The current logic for converting isolationLevel on the server side has not > yet handled the case where the user passes in a value that is neither > READ_UNCOMMITTED nor READ_COMMITTED : > {code:java} > val isolationLevelOpt = if (isClientRequest) > Some(offsetRequest.isolationLevel) > else > None {code} > {code:java} > public IsolationLevel isolationLevel() { > return IsolationLevel.forId(data.isolationLevel()); > } {code} > h1. > h2. Solution: > Added a new enum `NONE` in IsolationLevel, dedicated to > AdminClient.listOffsets() method. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)