RivenSun created KAFKA-13857: -------------------------------- Summary: The listOffsets method of KafkaAdminClient should support returning logEndOffset of topicPartition Key: KAFKA-13857 URL: https://issues.apache.org/jira/browse/KAFKA-13857 Project: Kafka Issue Type: Bug Components: admin Reporter: RivenSun
The server side currently handles the LIST_OFFSETS request process as follows: {code:java} KafkaApis.handleListOffsetRequest() -> KafkaApis.handleListOffsetRequestV1AndAbove() -> ReplicaManager.fetchOffsetForTimestamp() -> Partition.fetchOffsetForTimestamp(){code} In the last method above, it is obvious that when the client side does not pass the isolationLevel value, the server side supports returning localLog.logEndOffset. {code:java} val lastFetchableOffset = isolationLevel match { case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark case None => localLog.logEndOffset } {code} KafkaAdminClient is an operation and maintenance management tool, which *should be different from the listOffsets-related methods (offsetsForTimes, beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be limited by the value of {color:#FF0000}isolationLevel {color}in the ListOffsetsOptions parameter.* In the current KafkaAdminClient.listOffsets() method, both the AdminClient and the server consider isolationLevel as a required parameter: 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException will be thrown when AdminClient executes listOffsets() method. {code:java} ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code} 2) The current logic for converting isolationLevel on the server side has not yet handled the case where the user passes in a value that is neither READ_UNCOMMITTED nor READ_COMMITTED : {code:java} val isolationLevelOpt = if (isClientRequest) Some(offsetRequest.isolationLevel) else None {code} {code:java} public IsolationLevel isolationLevel() { return IsolationLevel.forId(data.isolationLevel()); } {code} h1. Solution: Added a new enum `NONE` in IsolationLevel, dedicated to AdminClient.listOffsets() method. -- This message was sent by Atlassian Jira (v8.20.7#820007)