chia7712 commented on a change in pull request #9630: URL: https://github.com/apache/kafka/pull/9630#discussion_r528545588
########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1862,23 +1864,51 @@ class ReplicaManager(val config: KafkaConfig, } } - def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]): Map[TopicPartition, EpochEndOffset] = { - requestedEpochInfo.map { case (tp, partitionData) => - val epochEndOffset = getPartition(tp) match { - case HostedPartition.Online(partition) => - partition.lastOffsetForLeaderEpoch(partitionData.currentLeaderEpoch, partitionData.leaderEpoch, - fetchOnlyFromLeader = true) - - case HostedPartition.Offline => - new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) - - case HostedPartition.None if metadataCache.contains(tp) => - new EpochEndOffset(Errors.NOT_LEADER_OR_FOLLOWER, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) - - case HostedPartition.None => - new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) + def lastOffsetForLeaderEpoch( + requestedEpochInfo: Seq[OffsetForLeaderTopic] + ): Seq[OffsetForLeaderTopicResult] = { + requestedEpochInfo.map { offsetForLeaderTopic => + val partitions = offsetForLeaderTopic.partitions.asScala.map { offsetForLeaderPartition => Review comment: Could we avoid duplicate conversion between scala and java? It can be rewrite by java stream APIs so the ```asScala``` can be avoid. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2592,25 +2596,39 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = { val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest] - val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition.asScala + val topics = offsetForLeaderEpoch.data.topics.asScala.toSeq // The OffsetsForLeaderEpoch API was initially only used for inter-broker communication and required // cluster permission. With KIP-320, the consumer now also uses this API to check for log truncation // following a leader change, so we also allow topic describe permission. - val (authorizedPartitions, unauthorizedPartitions) = + val (authorizedTopics, unauthorizedTopics) = if (authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME, logIfDenied = false)) - (requestInfo, Map.empty[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]) - else partitionMapByAuthorized(request.context, DESCRIBE, TOPIC, requestInfo)(_.topic) + (topics, Seq.empty[OffsetForLeaderTopic]) + else partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, topics)(_.topic) + + val endOffsetsForAuthorizedPartitions = replicaManager.lastOffsetForLeaderEpoch(authorizedTopics) + val endOffsetsForUnauthorizedPartitions = unauthorizedTopics.map { offsetForLeaderTopic => + val partitions = offsetForLeaderTopic.partitions.asScala.map { offsetForLeaderPartition => Review comment: ditto ########## File path: core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala ########## @@ -78,7 +80,19 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc callback.foreach(_.apply()) epochFetchCount += 1 lastUsedOffsetForLeaderEpochVersion = requestBuilder.latestAllowedVersion() - new OffsetsForLeaderEpochResponse(currentOffsets) + + val data = new OffsetForLeaderEpochResponseData() + currentOffsets.forEach((tp, offsetForLeaderPartition) => { + var topic = data.topics.find(tp.topic) + if (topic == null) { + topic = new OffsetForLeaderTopicResult() + .setTopic(tp.topic) + data.topics.add(topic) + } + topic.partitions.add(offsetForLeaderPartition.setPartition(tp.partition)) Review comment: Should it be ```topic.partitions.add(offsetForLeaderPartition.duplicate())```? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org