dajac commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r884492699
########## core/src/main/scala/kafka/server/LeaderEndPoint.scala: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.cluster.BrokerEndPoint +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.requests.FetchRequest +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData} + +import scala.collection.Map + +/** + * This trait defines the APIs to be used to access a broker that is a leader. + */ +trait LeaderEndPoint { + + type FetchData = FetchResponseData.PartitionData + type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition + + /** + * A boolean specifying if truncation when fetching from the leader is supported + */ + def isTruncationOnFetchSupported: Boolean + + /** + * Initiate closing access to fetches from leader. + */ + def initiateClose(): Unit + + /** + * Closes access to fetches from leader. + * `initiateClose` must be called prior to invoking `close`. + */ + def close(): Unit + + /** + * The specific broker (host:port) we want to connect to. + */ + def brokerEndPoint(): BrokerEndPoint + + /** + * Given a fetchRequest, carries out the expected request and returns + * the results from fetching from the leader. + * + * @param fetchRequest The fetch request we want to carry out + * + * @return A map of topic partition -> fetch data + */ + def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] + + /** + * Fetches the log start offset of the given topic partition from the leader. + * + * @param topicPartition The topic partition that we want to fetch from + * @param currentLeaderEpoch An int representing the current leader epoch of the requester + * + * @return A long representing the earliest offset in the leader's topic partition. + */ + def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long + + /** + * Fetches the log end offset of the given topic partition from the leader. + * + * @param topicPartition The topic partition that we want to fetch from + * @param currentLeaderEpoch An int representing the current leader epoch of the requester + * + * @return A long representing the latest offset in the leader's topic partition. + */ + def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long + + /** + * Fetches offset for leader epoch from the leader for each given topic partition + * + * @param partitions A map of topic partition -> leader epoch of the replica + * + * @return A map of topic partition -> end offset for a requested leader epoch + */ + def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] + + /** + * Builds a fetch request, given a partition map. + * + * @param partitionMap A map of topic partitions to their respective partition fetch state + * + * @return A ResultWithPartitions, used to create the fetchRequest for fetch. + */ + def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] Review Comment: nit: `partitions` to be consistent with `fetchEpochEndOffsets`? or the other way around? ########## core/src/main/scala/kafka/server/AbstractFetcherManager.scala: ########## @@ -68,7 +68,7 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri if (id.fetcherId >= newSize) thread.shutdown() partitionStates.forKeyValue { (topicPartition, currentFetchState) => - val initialFetchState = InitialFetchState(currentFetchState.topicId, thread.sourceBroker, + val initialFetchState = InitialFetchState(currentFetchState.topicId, thread.leader.brokerEndPoint(), Review Comment: nit: I wonder if we could omit the parenthesis when calling `brokerEndPoint`. There are two other cases below. ########## core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.api.Request +import kafka.cluster.BrokerEndPoint +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.server.QuotaFactory.UnboundedQuota +import kafka.utils.Logging +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.common.message.FetchResponseData +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils} + +import java.util +import java.util.Optional +import scala.collection.{Map, Seq, Set, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 +import scala.jdk.CollectionConverters._ + +/** + * Facilitates fetches from a local replica leader. + * + * @param sourceBroker The broker (host:port) that we want to connect to + * @param brokerConfig A config file with broker related configurations + * @param replicaMgr A ReplicaManager + * @param quota The quota, used when building a fetch request + */ +class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint, + brokerConfig: KafkaConfig, + replicaMgr: ReplicaManager, + quota: ReplicaQuota) extends LeaderEndPoint with Logging { + + private val replicaId = brokerConfig.brokerId + private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes + private val fetchSize = brokerConfig.replicaFetchMaxBytes + private var inProgressPartition: Option[TopicPartition] = None + + override val isTruncationOnFetchSupported: Boolean = false + + override def initiateClose(): Unit = {} // do nothing + + override def close(): Unit = {} // do nothing + + override def brokerEndPoint(): BrokerEndPoint = sourceBroker + + override def fetch(fetchRequest: FetchRequest.Builder): collection.Map[TopicPartition, FetchData] = { + var partitionData: Seq[(TopicPartition, FetchData)] = null + val request = fetchRequest.build() + + // We can build the map from the request since it contains topic IDs and names. + // Only one ID can be associated with a name and vice versa. + val topicNames = new mutable.HashMap[Uuid, String]() + request.data.topics.forEach { topic => + topicNames.put(topic.topicId, topic.topic) + } + + def processResponseCallback(responsePartitionData: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + partitionData = responsePartitionData.map { case (tp, data) => + val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull + val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET) + tp.topicPartition -> new FetchResponseData.PartitionData() + .setPartitionIndex(tp.topicPartition.partition) + .setErrorCode(data.error.code) + .setHighWatermark(data.highWatermark) + .setLastStableOffset(lastStableOffset) + .setLogStartOffset(data.logStartOffset) + .setAbortedTransactions(abortedTransactions) + .setRecords(data.records) + } + } + + val fetchData = request.fetchData(topicNames.asJava) + + val fetchParams = FetchParams( + requestVersion = request.version, + maxWaitMs = 0L, // timeout is 0 so that the callback will be executed immediately + replicaId = Request.FutureLocalReplicaId, + minBytes = request.minBytes, + maxBytes = request.maxBytes, + isolation = FetchLogEnd, + clientMetadata = None + ) + + replicaMgr.fetchMessages( + params = fetchParams, + fetchInfos = fetchData.asScala.toSeq, + quota = UnboundedQuota, + responseCallback = processResponseCallback + ) + + if (partitionData == null) + throw new IllegalStateException(s"Failed to fetch data for partitions ${fetchData.keySet().toArray.mkString(",")}") + + partitionData.toMap + } + + override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { + val partition = replicaMgr.getPartitionOrException(topicPartition) + partition.localLogOrException.logStartOffset + } + + override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { + val partition = replicaMgr.getPartitionOrException(topicPartition) + partition.localLogOrException.logEndOffset + } + + override def fetchEpochEndOffsets(partitions: collection.Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + partitions.map { case (tp, epochData) => + try { + val endOffset = if (epochData.leaderEpoch == UNDEFINED_EPOCH) { + new EpochEndOffset() + .setPartition(tp.partition) + .setErrorCode(Errors.NONE.code) + } else { + val partition = replicaMgr.getPartitionOrException(tp) + partition.lastOffsetForLeaderEpoch( + currentLeaderEpoch = RequestUtils.getLeaderEpoch(epochData.currentLeaderEpoch), + leaderEpoch = epochData.leaderEpoch, + fetchOnlyFromLeader = false) + } + tp -> endOffset + } catch { + case t: Throwable => + warn(s"Error when getting EpochEndOffset for $tp", t) + tp -> new EpochEndOffset() + .setPartition(tp.partition) + .setErrorCode(Errors.forException(t).code) + } + } + } + + override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = { + // Only include replica in the fetch request if it is not throttled. + if (quota.isQuotaExceeded) { + ResultWithPartitions(None, Set.empty) + } else { + selectPartitionToFetch(partitionMap) match { + case Some((tp, fetchState)) => + buildFetchForPartition(tp, fetchState) + case None => + ResultWithPartitions(None, Set.empty) + } + } + } + + private def selectPartitionToFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = { + // Only move one partition at a time to increase its catch-up rate and thus reduce the time spent on + // moving any given replica. Replicas are selected in ascending order (lexicographically by topic) from the + // partitions that are ready to fetch. Once selected, we will continue fetching the same partition until it + // becomes unavailable or is removed. + + inProgressPartition.foreach { tp => + val fetchStateOpt = partitionMap.get(tp) + fetchStateOpt.filter(_.isReadyForFetch).foreach { fetchState => + return Some((tp, fetchState)) + } + } + + inProgressPartition = None + + val nextPartitionOpt = nextReadyPartition(partitionMap) + nextPartitionOpt.foreach { case (tp, fetchState) => + inProgressPartition = Some(tp) + info(s"Beginning/resuming copy of partition $tp from offset ${fetchState.fetchOffset}. " + + s"Including this partition, there are ${partitionMap.size} remaining partitions to copy by this thread.") + } + nextPartitionOpt + } + + private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[ReplicaFetch]] = { + val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + val partitionsWithError = mutable.Set[TopicPartition]() + + try { + val logStartOffset = replicaMgr.futureLocalLogOrException(tp).logStartOffset + val lastFetchedEpoch = if (isTruncationOnFetchSupported) + fetchState.lastFetchedEpoch.map(_.asInstanceOf[Integer]).asJava + else + Optional.empty[Integer] + val topicId = fetchState.topicId.getOrElse(Uuid.ZERO_UUID) + requestMap.put(tp, new FetchRequest.PartitionData(topicId, fetchState.fetchOffset, logStartOffset, + fetchSize, Optional.of(fetchState.currentLeaderEpoch), lastFetchedEpoch)) + } catch { + case e: KafkaStorageException => + debug(s"Failed to build fetch for $tp", e) + partitionsWithError += tp + } + + val fetchRequestOpt = if (requestMap.isEmpty) { + None + } else { + val version: Short = if (fetchState.topicId.isEmpty) + 12 + else + ApiKeys.FETCH.latestVersion + // Set maxWait and minBytes to 0 because the response should return immediately if + // the future log has caught up with the current log of the partition + val requestBuilder = FetchRequest.Builder.forReplica(version, replicaId, 0, 0, requestMap).setMaxBytes(maxBytes) + Some(ReplicaFetch(requestMap, requestBuilder)) + } + + ResultWithPartitions(fetchRequestOpt, partitionsWithError) + } + + private def nextReadyPartition(partitionMap: Map[TopicPartition, PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = { Review Comment: nit: `partitions` as well? ########## core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala: ########## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.cluster.BrokerEndPoint + +import java.util.{Collections, Optional} +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.utils.Implicits.MapExtensionMethods +import kafka.utils.Logging +import org.apache.kafka.clients.FetchSessionHandler +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic, OffsetForLeaderTopicCollection} +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} +import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2 + +import scala.jdk.CollectionConverters._ +import scala.collection.{Map, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 + +/** + * Facilitates fetches from a remote replica leader. + * + * @param logPrefix The log prefix + * @param blockingSender The raw leader endpoint used to communicate with the leader + * @param fetchSessionHandler A FetchSessionHandler to track the partitions in the session + * @param brokerConfig Broker configuration + * @param replicaMgr A ReplicaManager + * @param quota The quota, used when building a fetch request + */ +class RemoteLeaderEndPoint(logPrefix: String, + blockingSender: BlockingSend, + private[server] val fetchSessionHandler: FetchSessionHandler, // visible for testing + brokerConfig: KafkaConfig, + replicaMgr: ReplicaManager, + quota: ReplicaQuota) extends LeaderEndPoint with Logging { + + this.logIdent = logPrefix + + private val maxWait = brokerConfig.replicaFetchWaitMaxMs + private val minBytes = brokerConfig.replicaFetchMinBytes + private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes + private val fetchSize = brokerConfig.replicaFetchMaxBytes + + override val isTruncationOnFetchSupported = brokerConfig.interBrokerProtocolVersion.isTruncationOnFetchSupported + + override def initiateClose(): Unit = blockingSender.initiateClose() + + override def close(): Unit = blockingSender.close() + + override def brokerEndPoint(): BrokerEndPoint = blockingSender.brokerEndPoint() + + override def fetch(fetchRequest: FetchRequest.Builder): collection.Map[TopicPartition, FetchData] = { + val clientResponse = try { + blockingSender.sendRequest(fetchRequest) + } catch { + case t: Throwable => + fetchSessionHandler.handleError(t) + throw t + } + val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse] + if (!fetchSessionHandler.handleResponse(fetchResponse, clientResponse.requestHeader().apiVersion())) { + // If we had a session topic ID related error, throw it, otherwise return an empty fetch data map. + if (fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR) { + throw Errors.forCode(fetchResponse.error().code()).exception() + } else { + Map.empty + } + } else { + fetchResponse.responseData(fetchSessionHandler.sessionTopicNames, clientResponse.requestHeader().apiVersion()).asScala + } + } + + override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { + fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_TIMESTAMP) + } + + override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { + fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.LATEST_TIMESTAMP) + } + + private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, earliestOrLatest: Long): Long = { + val topic = new ListOffsetsTopic() + .setName(topicPartition.topic) + .setPartitions(Collections.singletonList( + new ListOffsetsPartition() + .setPartitionIndex(topicPartition.partition) + .setCurrentLeaderEpoch(currentLeaderEpoch) + .setTimestamp(earliestOrLatest))) + val requestBuilder = ListOffsetsRequest.Builder.forReplica(brokerConfig.listOffsetRequestVersion, brokerConfig.brokerId) + .setTargetTimes(Collections.singletonList(topic)) + + val clientResponse = blockingSender.sendRequest(requestBuilder) + val response = clientResponse.responseBody.asInstanceOf[ListOffsetsResponse] + val responsePartition = response.topics.asScala.find(_.name == topicPartition.topic).get + .partitions.asScala.find(_.partitionIndex == topicPartition.partition).get + + Errors.forCode(responsePartition.errorCode) match { + case Errors.NONE => + if (brokerConfig.interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV2)) + responsePartition.offset + else + responsePartition.oldStyleOffsets.get(0) + case error => throw error.exception + } + } + + override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + if (partitions.isEmpty) { + debug("Skipping leaderEpoch request since all partitions do not have an epoch") + return Map.empty + } + + val topics = new OffsetForLeaderTopicCollection(partitions.size) + partitions.forKeyValue { (topicPartition, epochData) => + var topic = topics.find(topicPartition.topic) + if (topic == null) { + topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic) + topics.add(topic) + } + topic.partitions.add(epochData) + } + + val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower( + brokerConfig.offsetForLeaderEpochRequestVersion, topics, brokerConfig.brokerId) + debug(s"Sending offset for leader epoch request $epochRequest") + + try { + val response = blockingSender.sendRequest(epochRequest) + val responseBody = response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse] + debug(s"Received leaderEpoch response $response") + responseBody.data.topics.asScala.flatMap { offsetForLeaderTopicResult => + offsetForLeaderTopicResult.partitions.asScala.map { offsetForLeaderPartitionResult => + val tp = new TopicPartition(offsetForLeaderTopicResult.topic, offsetForLeaderPartitionResult.partition) + tp -> offsetForLeaderPartitionResult + } + }.toMap + } catch { + case t: Throwable => + warn(s"Error when sending leader epoch request for $partitions", t) + + // if we get any unexpected exception, mark all partitions with an error + val error = Errors.forException(t) + partitions.map { case (tp, _) => + tp -> new EpochEndOffset() + .setPartition(tp.partition) + .setErrorCode(error.code) + } + } + } + + override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = { + val partitionsWithError = mutable.Set[TopicPartition]() + + val builder = fetchSessionHandler.newBuilder(partitionMap.size, false) + partitionMap.forKeyValue { (topicPartition, fetchState) => + // We will not include a replica in the fetch request if it should be throttled. + if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, fetchState, topicPartition)) { + try { + val logStartOffset = replicaMgr.localLogOrException(topicPartition).logStartOffset + val lastFetchedEpoch = if (isTruncationOnFetchSupported) + fetchState.lastFetchedEpoch.map(_.asInstanceOf[Integer]).asJava + else + Optional.empty[Integer] + builder.add(topicPartition, new FetchRequest.PartitionData( + fetchState.topicId.getOrElse(Uuid.ZERO_UUID), + fetchState.fetchOffset, + logStartOffset, + fetchSize, + Optional.of(fetchState.currentLeaderEpoch), + lastFetchedEpoch)) + } catch { + case _: KafkaStorageException => + // The replica has already been marked offline due to log directory failure and the original failure should have already been logged. + // This partition should be removed from ReplicaFetcherThread soon by ReplicaManager.handleLogDirFailure() + partitionsWithError += topicPartition + } + } + } + + val fetchData = builder.build() + val fetchRequestOpt = if (fetchData.sessionPartitions.isEmpty && fetchData.toForget.isEmpty) { + None + } else { + val version: Short = if (brokerConfig.fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) 12 else brokerConfig.fetchRequestVersion + val requestBuilder = FetchRequest.Builder + .forReplica(version, brokerConfig.brokerId, maxWait, minBytes, fetchData.toSend) + .setMaxBytes(maxBytes) + .removed(fetchData.toForget) + .replaced(fetchData.toReplace) + .metadata(fetchData.metadata) + Some(ReplicaFetch(fetchData.sessionPartitions(), requestBuilder)) + } + + ResultWithPartitions(fetchRequestOpt, partitionsWithError) + } + + /** + * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. + */ + private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = { + !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded + } + + override def toString: String = s"RemoteLeaderEndPoint with $blockingSender" Review Comment: nit: We usually structure `toString` like this: `RemoteLeaderEndPoint(blockingSender=$blockingSender)`. I am not sure if there is a reason to do it differently here. If there is, please ignore my comment. ########## core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.api.Request +import kafka.cluster.BrokerEndPoint +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.server.QuotaFactory.UnboundedQuota +import kafka.utils.Logging +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.common.message.FetchResponseData +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils} + +import java.util +import java.util.Optional +import scala.collection.{Map, Seq, Set, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 +import scala.jdk.CollectionConverters._ + +/** + * Facilitates fetches from a local replica leader. + * + * @param sourceBroker The broker (host:port) that we want to connect to + * @param brokerConfig A config file with broker related configurations + * @param replicaMgr A ReplicaManager + * @param quota The quota, used when building a fetch request + */ +class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint, + brokerConfig: KafkaConfig, + replicaMgr: ReplicaManager, + quota: ReplicaQuota) extends LeaderEndPoint with Logging { + + private val replicaId = brokerConfig.brokerId + private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes + private val fetchSize = brokerConfig.replicaFetchMaxBytes + private var inProgressPartition: Option[TopicPartition] = None + + override val isTruncationOnFetchSupported: Boolean = false + + override def initiateClose(): Unit = {} // do nothing + + override def close(): Unit = {} // do nothing + + override def brokerEndPoint(): BrokerEndPoint = sourceBroker + + override def fetch(fetchRequest: FetchRequest.Builder): collection.Map[TopicPartition, FetchData] = { + var partitionData: Seq[(TopicPartition, FetchData)] = null + val request = fetchRequest.build() + + // We can build the map from the request since it contains topic IDs and names. + // Only one ID can be associated with a name and vice versa. + val topicNames = new mutable.HashMap[Uuid, String]() + request.data.topics.forEach { topic => + topicNames.put(topic.topicId, topic.topic) + } + + def processResponseCallback(responsePartitionData: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + partitionData = responsePartitionData.map { case (tp, data) => + val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull + val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET) + tp.topicPartition -> new FetchResponseData.PartitionData() + .setPartitionIndex(tp.topicPartition.partition) + .setErrorCode(data.error.code) + .setHighWatermark(data.highWatermark) + .setLastStableOffset(lastStableOffset) + .setLogStartOffset(data.logStartOffset) + .setAbortedTransactions(abortedTransactions) + .setRecords(data.records) + } + } + + val fetchData = request.fetchData(topicNames.asJava) + + val fetchParams = FetchParams( + requestVersion = request.version, + maxWaitMs = 0L, // timeout is 0 so that the callback will be executed immediately + replicaId = Request.FutureLocalReplicaId, + minBytes = request.minBytes, + maxBytes = request.maxBytes, + isolation = FetchLogEnd, + clientMetadata = None + ) + + replicaMgr.fetchMessages( + params = fetchParams, + fetchInfos = fetchData.asScala.toSeq, + quota = UnboundedQuota, + responseCallback = processResponseCallback + ) + + if (partitionData == null) + throw new IllegalStateException(s"Failed to fetch data for partitions ${fetchData.keySet().toArray.mkString(",")}") + + partitionData.toMap + } + + override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { + val partition = replicaMgr.getPartitionOrException(topicPartition) + partition.localLogOrException.logStartOffset + } + + override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { + val partition = replicaMgr.getPartitionOrException(topicPartition) + partition.localLogOrException.logEndOffset + } + + override def fetchEpochEndOffsets(partitions: collection.Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + partitions.map { case (tp, epochData) => + try { + val endOffset = if (epochData.leaderEpoch == UNDEFINED_EPOCH) { + new EpochEndOffset() + .setPartition(tp.partition) + .setErrorCode(Errors.NONE.code) + } else { + val partition = replicaMgr.getPartitionOrException(tp) + partition.lastOffsetForLeaderEpoch( + currentLeaderEpoch = RequestUtils.getLeaderEpoch(epochData.currentLeaderEpoch), + leaderEpoch = epochData.leaderEpoch, + fetchOnlyFromLeader = false) + } + tp -> endOffset + } catch { + case t: Throwable => + warn(s"Error when getting EpochEndOffset for $tp", t) + tp -> new EpochEndOffset() + .setPartition(tp.partition) + .setErrorCode(Errors.forException(t).code) + } + } + } + + override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = { + // Only include replica in the fetch request if it is not throttled. + if (quota.isQuotaExceeded) { + ResultWithPartitions(None, Set.empty) + } else { + selectPartitionToFetch(partitionMap) match { + case Some((tp, fetchState)) => + buildFetchForPartition(tp, fetchState) + case None => + ResultWithPartitions(None, Set.empty) + } + } + } + + private def selectPartitionToFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = { Review Comment: nit: ditto. ########## core/src/main/scala/kafka/server/BrokerBlockingSender.scala: ########## @@ -124,4 +128,8 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint, def close(): Unit = { networkClient.close() } + + override def toString: String = { + s"BrokerBlockingSender($sourceBroker, fetcherId=$fetcherId)" Review Comment: nit: `sourceBroker=$sourceBroker` to be consistent? ########## core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.api.Request +import kafka.cluster.BrokerEndPoint +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.server.QuotaFactory.UnboundedQuota +import kafka.utils.Logging +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.common.message.FetchResponseData +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils} + +import java.util +import java.util.Optional +import scala.collection.{Map, Seq, Set, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 +import scala.jdk.CollectionConverters._ + +/** + * Facilitates fetches from a local replica leader. + * + * @param sourceBroker The broker (host:port) that we want to connect to + * @param brokerConfig A config file with broker related configurations + * @param replicaMgr A ReplicaManager + * @param quota The quota, used when building a fetch request + */ +class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint, + brokerConfig: KafkaConfig, + replicaMgr: ReplicaManager, Review Comment: nit: `replicaManager` seems better and more consistent with the rest of the code base. ########## core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.api.Request +import kafka.cluster.BrokerEndPoint +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.server.QuotaFactory.UnboundedQuota +import kafka.utils.Logging +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.common.message.FetchResponseData +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils} + +import java.util +import java.util.Optional +import scala.collection.{Map, Seq, Set, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 +import scala.jdk.CollectionConverters._ + +/** + * Facilitates fetches from a local replica leader. + * + * @param sourceBroker The broker (host:port) that we want to connect to + * @param brokerConfig A config file with broker related configurations + * @param replicaMgr A ReplicaManager + * @param quota The quota, used when building a fetch request + */ +class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint, + brokerConfig: KafkaConfig, + replicaMgr: ReplicaManager, + quota: ReplicaQuota) extends LeaderEndPoint with Logging { + + private val replicaId = brokerConfig.brokerId + private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes + private val fetchSize = brokerConfig.replicaFetchMaxBytes + private var inProgressPartition: Option[TopicPartition] = None + + override val isTruncationOnFetchSupported: Boolean = false + + override def initiateClose(): Unit = {} // do nothing + + override def close(): Unit = {} // do nothing + + override def brokerEndPoint(): BrokerEndPoint = sourceBroker + + override def fetch(fetchRequest: FetchRequest.Builder): collection.Map[TopicPartition, FetchData] = { + var partitionData: Seq[(TopicPartition, FetchData)] = null + val request = fetchRequest.build() + + // We can build the map from the request since it contains topic IDs and names. + // Only one ID can be associated with a name and vice versa. + val topicNames = new mutable.HashMap[Uuid, String]() + request.data.topics.forEach { topic => + topicNames.put(topic.topicId, topic.topic) + } + + def processResponseCallback(responsePartitionData: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + partitionData = responsePartitionData.map { case (tp, data) => + val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull + val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET) + tp.topicPartition -> new FetchResponseData.PartitionData() + .setPartitionIndex(tp.topicPartition.partition) + .setErrorCode(data.error.code) + .setHighWatermark(data.highWatermark) + .setLastStableOffset(lastStableOffset) + .setLogStartOffset(data.logStartOffset) + .setAbortedTransactions(abortedTransactions) + .setRecords(data.records) + } + } + + val fetchData = request.fetchData(topicNames.asJava) + + val fetchParams = FetchParams( + requestVersion = request.version, + maxWaitMs = 0L, // timeout is 0 so that the callback will be executed immediately + replicaId = Request.FutureLocalReplicaId, + minBytes = request.minBytes, + maxBytes = request.maxBytes, + isolation = FetchLogEnd, + clientMetadata = None + ) + + replicaMgr.fetchMessages( + params = fetchParams, + fetchInfos = fetchData.asScala.toSeq, + quota = UnboundedQuota, + responseCallback = processResponseCallback + ) + + if (partitionData == null) + throw new IllegalStateException(s"Failed to fetch data for partitions ${fetchData.keySet().toArray.mkString(",")}") + + partitionData.toMap + } + + override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { + val partition = replicaMgr.getPartitionOrException(topicPartition) + partition.localLogOrException.logStartOffset + } + + override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { + val partition = replicaMgr.getPartitionOrException(topicPartition) + partition.localLogOrException.logEndOffset + } + + override def fetchEpochEndOffsets(partitions: collection.Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + partitions.map { case (tp, epochData) => + try { + val endOffset = if (epochData.leaderEpoch == UNDEFINED_EPOCH) { + new EpochEndOffset() + .setPartition(tp.partition) + .setErrorCode(Errors.NONE.code) + } else { + val partition = replicaMgr.getPartitionOrException(tp) + partition.lastOffsetForLeaderEpoch( + currentLeaderEpoch = RequestUtils.getLeaderEpoch(epochData.currentLeaderEpoch), + leaderEpoch = epochData.leaderEpoch, + fetchOnlyFromLeader = false) + } + tp -> endOffset + } catch { + case t: Throwable => + warn(s"Error when getting EpochEndOffset for $tp", t) + tp -> new EpochEndOffset() + .setPartition(tp.partition) + .setErrorCode(Errors.forException(t).code) + } + } + } + + override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = { Review Comment: nit: `partitions`? ########## core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala: ########## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.cluster.BrokerEndPoint + +import java.util.{Collections, Optional} +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.utils.Implicits.MapExtensionMethods +import kafka.utils.Logging +import org.apache.kafka.clients.FetchSessionHandler +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic, OffsetForLeaderTopicCollection} +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} +import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2 + +import scala.jdk.CollectionConverters._ +import scala.collection.{Map, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 + +/** + * Facilitates fetches from a remote replica leader. + * + * @param logPrefix The log prefix + * @param blockingSender The raw leader endpoint used to communicate with the leader + * @param fetchSessionHandler A FetchSessionHandler to track the partitions in the session + * @param brokerConfig Broker configuration + * @param replicaMgr A ReplicaManager + * @param quota The quota, used when building a fetch request + */ +class RemoteLeaderEndPoint(logPrefix: String, + blockingSender: BlockingSend, + private[server] val fetchSessionHandler: FetchSessionHandler, // visible for testing + brokerConfig: KafkaConfig, + replicaMgr: ReplicaManager, Review Comment: nit: `replicaManager`? ########## core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.api.Request +import kafka.cluster.BrokerEndPoint +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.server.QuotaFactory.UnboundedQuota +import kafka.utils.Logging +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.common.message.FetchResponseData +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils} + +import java.util +import java.util.Optional +import scala.collection.{Map, Seq, Set, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 +import scala.jdk.CollectionConverters._ + +/** + * Facilitates fetches from a local replica leader. + * + * @param sourceBroker The broker (host:port) that we want to connect to + * @param brokerConfig A config file with broker related configurations + * @param replicaMgr A ReplicaManager + * @param quota The quota, used when building a fetch request + */ +class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint, + brokerConfig: KafkaConfig, + replicaMgr: ReplicaManager, + quota: ReplicaQuota) extends LeaderEndPoint with Logging { + + private val replicaId = brokerConfig.brokerId + private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes + private val fetchSize = brokerConfig.replicaFetchMaxBytes + private var inProgressPartition: Option[TopicPartition] = None + + override val isTruncationOnFetchSupported: Boolean = false + + override def initiateClose(): Unit = {} // do nothing + + override def close(): Unit = {} // do nothing + + override def brokerEndPoint(): BrokerEndPoint = sourceBroker + + override def fetch(fetchRequest: FetchRequest.Builder): collection.Map[TopicPartition, FetchData] = { + var partitionData: Seq[(TopicPartition, FetchData)] = null + val request = fetchRequest.build() + + // We can build the map from the request since it contains topic IDs and names. + // Only one ID can be associated with a name and vice versa. + val topicNames = new mutable.HashMap[Uuid, String]() + request.data.topics.forEach { topic => + topicNames.put(topic.topicId, topic.topic) + } + + def processResponseCallback(responsePartitionData: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + partitionData = responsePartitionData.map { case (tp, data) => + val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull + val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET) + tp.topicPartition -> new FetchResponseData.PartitionData() + .setPartitionIndex(tp.topicPartition.partition) + .setErrorCode(data.error.code) + .setHighWatermark(data.highWatermark) + .setLastStableOffset(lastStableOffset) + .setLogStartOffset(data.logStartOffset) + .setAbortedTransactions(abortedTransactions) + .setRecords(data.records) + } + } + + val fetchData = request.fetchData(topicNames.asJava) + + val fetchParams = FetchParams( + requestVersion = request.version, + maxWaitMs = 0L, // timeout is 0 so that the callback will be executed immediately + replicaId = Request.FutureLocalReplicaId, + minBytes = request.minBytes, + maxBytes = request.maxBytes, + isolation = FetchLogEnd, + clientMetadata = None + ) + + replicaMgr.fetchMessages( + params = fetchParams, + fetchInfos = fetchData.asScala.toSeq, + quota = UnboundedQuota, + responseCallback = processResponseCallback + ) + + if (partitionData == null) + throw new IllegalStateException(s"Failed to fetch data for partitions ${fetchData.keySet().toArray.mkString(",")}") + + partitionData.toMap + } + + override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { + val partition = replicaMgr.getPartitionOrException(topicPartition) + partition.localLogOrException.logStartOffset + } + + override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { + val partition = replicaMgr.getPartitionOrException(topicPartition) + partition.localLogOrException.logEndOffset + } + + override def fetchEpochEndOffsets(partitions: collection.Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + partitions.map { case (tp, epochData) => + try { + val endOffset = if (epochData.leaderEpoch == UNDEFINED_EPOCH) { + new EpochEndOffset() + .setPartition(tp.partition) + .setErrorCode(Errors.NONE.code) + } else { + val partition = replicaMgr.getPartitionOrException(tp) + partition.lastOffsetForLeaderEpoch( + currentLeaderEpoch = RequestUtils.getLeaderEpoch(epochData.currentLeaderEpoch), + leaderEpoch = epochData.leaderEpoch, + fetchOnlyFromLeader = false) + } + tp -> endOffset + } catch { + case t: Throwable => + warn(s"Error when getting EpochEndOffset for $tp", t) + tp -> new EpochEndOffset() + .setPartition(tp.partition) + .setErrorCode(Errors.forException(t).code) + } + } + } + + override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = { + // Only include replica in the fetch request if it is not throttled. + if (quota.isQuotaExceeded) { + ResultWithPartitions(None, Set.empty) + } else { + selectPartitionToFetch(partitionMap) match { + case Some((tp, fetchState)) => + buildFetchForPartition(tp, fetchState) + case None => + ResultWithPartitions(None, Set.empty) + } + } + } + + private def selectPartitionToFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = { + // Only move one partition at a time to increase its catch-up rate and thus reduce the time spent on + // moving any given replica. Replicas are selected in ascending order (lexicographically by topic) from the + // partitions that are ready to fetch. Once selected, we will continue fetching the same partition until it + // becomes unavailable or is removed. + + inProgressPartition.foreach { tp => + val fetchStateOpt = partitionMap.get(tp) + fetchStateOpt.filter(_.isReadyForFetch).foreach { fetchState => + return Some((tp, fetchState)) + } + } + + inProgressPartition = None + + val nextPartitionOpt = nextReadyPartition(partitionMap) + nextPartitionOpt.foreach { case (tp, fetchState) => + inProgressPartition = Some(tp) + info(s"Beginning/resuming copy of partition $tp from offset ${fetchState.fetchOffset}. " + + s"Including this partition, there are ${partitionMap.size} remaining partitions to copy by this thread.") + } + nextPartitionOpt + } + + private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[ReplicaFetch]] = { Review Comment: nit: `topicPartition` instead of `tp`? ########## core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala: ########## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.cluster.BrokerEndPoint + +import java.util.{Collections, Optional} +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.utils.Implicits.MapExtensionMethods +import kafka.utils.Logging +import org.apache.kafka.clients.FetchSessionHandler +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic, OffsetForLeaderTopicCollection} +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} +import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2 + +import scala.jdk.CollectionConverters._ +import scala.collection.{Map, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 + +/** + * Facilitates fetches from a remote replica leader. + * + * @param logPrefix The log prefix + * @param blockingSender The raw leader endpoint used to communicate with the leader + * @param fetchSessionHandler A FetchSessionHandler to track the partitions in the session + * @param brokerConfig Broker configuration + * @param replicaMgr A ReplicaManager + * @param quota The quota, used when building a fetch request + */ +class RemoteLeaderEndPoint(logPrefix: String, + blockingSender: BlockingSend, + private[server] val fetchSessionHandler: FetchSessionHandler, // visible for testing + brokerConfig: KafkaConfig, + replicaMgr: ReplicaManager, + quota: ReplicaQuota) extends LeaderEndPoint with Logging { + + this.logIdent = logPrefix + + private val maxWait = brokerConfig.replicaFetchWaitMaxMs + private val minBytes = brokerConfig.replicaFetchMinBytes + private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes + private val fetchSize = brokerConfig.replicaFetchMaxBytes + + override val isTruncationOnFetchSupported = brokerConfig.interBrokerProtocolVersion.isTruncationOnFetchSupported + + override def initiateClose(): Unit = blockingSender.initiateClose() + + override def close(): Unit = blockingSender.close() + + override def brokerEndPoint(): BrokerEndPoint = blockingSender.brokerEndPoint() + + override def fetch(fetchRequest: FetchRequest.Builder): collection.Map[TopicPartition, FetchData] = { + val clientResponse = try { + blockingSender.sendRequest(fetchRequest) + } catch { + case t: Throwable => + fetchSessionHandler.handleError(t) + throw t + } + val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse] + if (!fetchSessionHandler.handleResponse(fetchResponse, clientResponse.requestHeader().apiVersion())) { + // If we had a session topic ID related error, throw it, otherwise return an empty fetch data map. + if (fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR) { + throw Errors.forCode(fetchResponse.error().code()).exception() + } else { + Map.empty + } + } else { + fetchResponse.responseData(fetchSessionHandler.sessionTopicNames, clientResponse.requestHeader().apiVersion()).asScala + } + } + + override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { + fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_TIMESTAMP) + } + + override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { + fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.LATEST_TIMESTAMP) + } + + private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, earliestOrLatest: Long): Long = { + val topic = new ListOffsetsTopic() + .setName(topicPartition.topic) + .setPartitions(Collections.singletonList( + new ListOffsetsPartition() + .setPartitionIndex(topicPartition.partition) + .setCurrentLeaderEpoch(currentLeaderEpoch) + .setTimestamp(earliestOrLatest))) + val requestBuilder = ListOffsetsRequest.Builder.forReplica(brokerConfig.listOffsetRequestVersion, brokerConfig.brokerId) + .setTargetTimes(Collections.singletonList(topic)) + + val clientResponse = blockingSender.sendRequest(requestBuilder) + val response = clientResponse.responseBody.asInstanceOf[ListOffsetsResponse] + val responsePartition = response.topics.asScala.find(_.name == topicPartition.topic).get + .partitions.asScala.find(_.partitionIndex == topicPartition.partition).get + + Errors.forCode(responsePartition.errorCode) match { + case Errors.NONE => + if (brokerConfig.interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV2)) + responsePartition.offset + else + responsePartition.oldStyleOffsets.get(0) + case error => throw error.exception + } + } + + override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + if (partitions.isEmpty) { + debug("Skipping leaderEpoch request since all partitions do not have an epoch") + return Map.empty + } + + val topics = new OffsetForLeaderTopicCollection(partitions.size) + partitions.forKeyValue { (topicPartition, epochData) => + var topic = topics.find(topicPartition.topic) + if (topic == null) { + topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic) + topics.add(topic) + } + topic.partitions.add(epochData) + } + + val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower( + brokerConfig.offsetForLeaderEpochRequestVersion, topics, brokerConfig.brokerId) + debug(s"Sending offset for leader epoch request $epochRequest") + + try { + val response = blockingSender.sendRequest(epochRequest) + val responseBody = response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse] + debug(s"Received leaderEpoch response $response") + responseBody.data.topics.asScala.flatMap { offsetForLeaderTopicResult => + offsetForLeaderTopicResult.partitions.asScala.map { offsetForLeaderPartitionResult => + val tp = new TopicPartition(offsetForLeaderTopicResult.topic, offsetForLeaderPartitionResult.partition) + tp -> offsetForLeaderPartitionResult + } + }.toMap + } catch { + case t: Throwable => + warn(s"Error when sending leader epoch request for $partitions", t) + + // if we get any unexpected exception, mark all partitions with an error + val error = Errors.forException(t) + partitions.map { case (tp, _) => + tp -> new EpochEndOffset() + .setPartition(tp.partition) + .setErrorCode(error.code) + } + } + } + + override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = { Review Comment: nit: `partitions`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org