[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r494520245 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1764,6 +1769,145 @@ class KafkaController(val config: KafkaConfig, } } + def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: AlterIsrResponseData => Unit): Unit = { +val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]() + +alterIsrRequest.topics.forEach { topicReq => + topicReq.partitions.forEach { partitionReq => +val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex) +val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt) +isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion)) + } +} + +def responseCallback(results: Either[Map[TopicPartition, Either[Errors, LeaderAndIsr]], Errors]): Unit = { + val resp = new AlterIsrResponseData() + results match { +case Right(error) => + resp.setErrorCode(error.code) +case Left(partitionResults) => + resp.setTopics(new util.ArrayList()) + partitionResults +.groupBy { case (tp, _) => tp.topic } // Group by topic +.foreach { case (topic, partitions) => + // Add each topic part to the response + val topicResp = new AlterIsrResponseData.TopicData() +.setName(topic) +.setPartitions(new util.ArrayList()) + resp.topics.add(topicResp) + partitions.foreach { case (tp, errorOrIsr) => +// Add each partition part to the response (new ISR or error) +errorOrIsr match { + case Left(error) => topicResp.partitions.add( +new AlterIsrResponseData.PartitionData() + .setPartitionIndex(tp.partition) + .setErrorCode(error.code)) + case Right(leaderAndIsr) => topicResp.partitions.add( +new AlterIsrResponseData.PartitionData() + .setPartitionIndex(tp.partition) + .setLeaderId(leaderAndIsr.leader) + .setLeaderEpoch(leaderAndIsr.leaderEpoch) + .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(leaderAndIsr.zkVersion)) +} +} + } + } + callback.apply(resp) +} + +eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback)) + } + + private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], + callback: AlterIsrCallback): Unit = { + +// Handle a few short-circuits +if (!isActive) { + callback.apply(Right(Errors.NOT_CONTROLLER)) + return +} + +val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId) +if (brokerEpochOpt.isEmpty) { + info(s"Ignoring AlterIsr due to unknown broker $brokerId") + callback.apply(Right(Errors.STALE_BROKER_EPOCH)) + return +} + +if (!brokerEpochOpt.contains(brokerEpoch)) { + info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for broker $brokerId") + callback.apply(Right(Errors.STALE_BROKER_EPOCH)) + return +} + +val response = try { + val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]() + + // Determine which partitions we will accept the new ISR for + val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = isrsToAlter.flatMap { +case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) => + val partitionError: Errors = controllerContext.partitionLeadershipInfo(tp) match { +case Some(leaderIsrAndControllerEpoch) => + val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr + if (newLeaderAndIsr.leaderEpoch < currentLeaderAndIsr.leaderEpoch) { Review comment: I was trying to think some kind of race with a zombie leader trying to update the ISR, however this would get fenced by the leader epoch. This should be pretty easy to add ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -1246,6 +1351,51 @@ class Partition(val topicPartition: TopicPartition, } } + private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = { +val isrToSendOpt: Option[Set[Int]] = proposedIsrState match { + case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + newInSyncReplicaId) + case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- outOfSyncReplicaIds) + case CommittedIsr(_) => +error(s"Asked to send AlterIsr but there are no pending updates") +
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r494523773 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -1246,6 +1351,51 @@ class Partition(val topicPartition: TopicPartition, } } + private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = { +val isrToSendOpt: Option[Set[Int]] = proposedIsrState match { + case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + newInSyncReplicaId) + case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- outOfSyncReplicaIds) + case CommittedIsr(_) => +error(s"Asked to send AlterIsr but there are no pending updates") +None +} +isrToSendOpt.exists { isrToSend => + val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion) + val callbackPartial = handleAlterIsrResponse(isrToSend, _ : Either[Errors, LeaderAndIsr]) + alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr, callbackPartial)) +} + } + + private def handleAlterIsrResponse(proposedIsr: Set[Int], result: Either[Errors, LeaderAndIsr]): Unit = { +inWriteLock(leaderIsrUpdateLock) { + result match { +case Left(error: Errors) => error match { + case Errors.UNKNOWN_TOPIC_OR_PARTITION => +debug(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} since it doesn't know about this topic or partition. Giving up.") + case Errors.FENCED_LEADER_EPOCH => +debug(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.") + case Errors.INVALID_UPDATE_VERSION => +debug(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} due to invalid zk version. Retrying.") +sendAlterIsrRequest(isrState) + case _ => +warn(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} due to $error. Retrying.") +sendAlterIsrRequest(isrState) Review comment: True, we could see a new ISR from controller initiated changes via LeaderAndIsr while our request is in-flight. We have a check for this on successful responses, but we should also check here. Since our request failed, we don't have a leaderEpoch to check against so I think the best we can do is see if `isrState` is still pending before re-sending the request This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r490594335 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -618,9 +682,9 @@ class Partition(val topicPartition: TopicPartition, // since the replica's logStartOffset may have incremented val leaderLWIncremented = newLeaderLW > oldLeaderLW -// check if we need to expand ISR to include this replica -// if it is not in the ISR yet -if (!inSyncReplicaIds.contains(followerId)) +// Check if this in-sync replica needs to be added to the ISR. We look at the "maximal" ISR here so we don't +// send an additional Alter ISR request for the same replica Review comment: Yea checking the maximal set isn't needed anymore since adding the sealed trait. I'll just update this to simply call `maybeExpandIsr` which will do the check you propose here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r490562630 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -920,7 +986,7 @@ class Partition(val topicPartition: TopicPartition, * is violated, that replica is considered to be out of sync * **/ -val candidateReplicaIds = inSyncReplicaIds - localBrokerId +val candidateReplicaIds = isrState.maximalIsr - localBrokerId Review comment: Also, yes it's confusing to refer to `maximalIsr` here even though it should always equal the committed ISR at this point (assuming we check for inflight first). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r490561028 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -920,7 +986,7 @@ class Partition(val topicPartition: TopicPartition, * is violated, that replica is considered to be out of sync * **/ -val candidateReplicaIds = inSyncReplicaIds - localBrokerId +val candidateReplicaIds = isrState.maximalIsr - localBrokerId Review comment: Makes sense, that will also satisfy your other comment about not checking for inflight requests within the write lock This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r490542749 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -200,9 +241,11 @@ class Partition(val topicPartition: TopicPartition, // defined when this broker is leader for partition @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None @volatile var leaderReplicaIdOpt: Option[Int] = None - @volatile var inSyncReplicaIds = Set.empty[Int] + @volatile var isrState: IsrState = CommittedIsr(Set.empty) Review comment: Yea i was thinking we should move the ISR to a separate public accessor. I'll change this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r489756083 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1317,7 +1326,9 @@ class ReplicaManager(val config: KafkaConfig, partitionOpt.foreach { partition => val currentLeaderEpoch = partition.getLeaderEpoch val requestLeaderEpoch = partitionState.leaderEpoch - if (requestLeaderEpoch > currentLeaderEpoch) { + val currentZkVersion = partition.getZkVersion + val requestZkVersion = partitionState.zkVersion + if (requestLeaderEpoch > currentLeaderEpoch || requestZkVersion > currentZkVersion) { Review comment: No, we don't need this anymore. This was added so a LeaderAndIsr could update the Partition state without a leader epoch bump, but we don't have that flow anymore so we can revert this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r489753946 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -3045,6 +3045,24 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleAlterIsrRequest(request: RequestChannel.Request): Unit = { +val alterIsrRequest = request.body[AlterIsrRequest] + +if (!controller.isActive) { Review comment: This is actually a really good point. I filed a JIRA to fix this in other places in KafkaApis https://issues.apache.org/jira/browse/KAFKA-10491 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r489747101 ## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ## @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} +import org.apache.kafka.common.utils.Time + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrManager { + def start(): Unit + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean + + def clearPending(topicPartition: TopicPartition): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr, callback: Either[Errors, LeaderAndIsr] => Unit) + +class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val time: Time, + val brokerId: Int, + val brokerEpochSupplier: () => Long) extends AlterIsrManager with Logging with KafkaMetricsGroup { + + // Used to allow only one pending ISR update per partition + private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]() + + // Used to allow only one in-flight request at a time + private val inflightRequest: AtomicBoolean = new AtomicBoolean(false) + + private val lastIsrPropagationMs = new AtomicLong(0) + + override def start(): Unit = { +scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS) + } + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean = { +unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +unsentIsrUpdates.remove(topicPartition) + } + + private def propagateIsrChanges(): Unit = { +if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) { + // Copy current unsent ISRs but don't remove from the map + val inflightAlterIsrItems: mutable.Queue[AlterIsrItem] = new mutable.Queue[AlterIsrItem]() + unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.enqueue(item)) + + val now = time.milliseconds() + lastIsrPropagationMs.set(now) + + buildAndSendRequest(inflightAlterIsrItems.toSeq) +} + } + + def buildAndSendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { +val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpochSupplier.apply()) + .setTopics(new util.ArrayList()) + +inflightAlterIsrItems.groupBy(_.topicPartition.topic).foreach(entry => { + val topicPart = new AlterIsrRequestData.TopicData() +.setName(entry._1) +.setPartitions(new util.ArrayList()) + message.topics().add(topicPart) + entry._2.foreach(item => { +topicPart.partitions().add(new AlterIsrRequestData.PartitionData() + .setPartitionIndex(item.topicPartition.partition) + .setLeaderId(item.leaderAndIsr.leader) + .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) + .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(item.leaderAndIsr.zkVersion) +)
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r489745539 ## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ## @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} +import org.apache.kafka.common.utils.Time + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrManager { + def start(): Unit + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean + + def clearPending(topicPartition: TopicPartition): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr, callback: Either[Errors, LeaderAndIsr] => Unit) + +class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val time: Time, + val brokerId: Int, + val brokerEpochSupplier: () => Long) extends AlterIsrManager with Logging with KafkaMetricsGroup { + + // Used to allow only one pending ISR update per partition + private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]() + + // Used to allow only one in-flight request at a time + private val inflightRequest: AtomicBoolean = new AtomicBoolean(false) + + private val lastIsrPropagationMs = new AtomicLong(0) + + override def start(): Unit = { +scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS) + } + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean = { +unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +unsentIsrUpdates.remove(topicPartition) + } + + private def propagateIsrChanges(): Unit = { +if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) { + // Copy current unsent ISRs but don't remove from the map + val inflightAlterIsrItems: mutable.Queue[AlterIsrItem] = new mutable.Queue[AlterIsrItem]() + unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.enqueue(item)) + + val now = time.milliseconds() + lastIsrPropagationMs.set(now) + + buildAndSendRequest(inflightAlterIsrItems.toSeq) +} + } + + def buildAndSendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { +val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpochSupplier.apply()) + .setTopics(new util.ArrayList()) + +inflightAlterIsrItems.groupBy(_.topicPartition.topic).foreach(entry => { + val topicPart = new AlterIsrRequestData.TopicData() +.setName(entry._1) +.setPartitions(new util.ArrayList()) + message.topics().add(topicPart) + entry._2.foreach(item => { +topicPart.partitions().add(new AlterIsrRequestData.PartitionData() + .setPartitionIndex(item.topicPartition.partition) + .setLeaderId(item.leaderAndIsr.leader) + .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) + .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(item.leaderAndIsr.zkVersion) +)
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r489744561 ## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ## @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} +import org.apache.kafka.common.utils.Time + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrManager { + def start(): Unit + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean + + def clearPending(topicPartition: TopicPartition): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr, callback: Either[Errors, LeaderAndIsr] => Unit) + +class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val time: Time, + val brokerId: Int, + val brokerEpochSupplier: () => Long) extends AlterIsrManager with Logging with KafkaMetricsGroup { + + // Used to allow only one pending ISR update per partition + private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]() + + // Used to allow only one in-flight request at a time + private val inflightRequest: AtomicBoolean = new AtomicBoolean(false) + + private val lastIsrPropagationMs = new AtomicLong(0) + + override def start(): Unit = { +scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS) + } + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean = { +unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +unsentIsrUpdates.remove(topicPartition) + } + + private def propagateIsrChanges(): Unit = { +if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) { + // Copy current unsent ISRs but don't remove from the map + val inflightAlterIsrItems: mutable.Queue[AlterIsrItem] = new mutable.Queue[AlterIsrItem]() + unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.enqueue(item)) + + val now = time.milliseconds() + lastIsrPropagationMs.set(now) + + buildAndSendRequest(inflightAlterIsrItems.toSeq) +} + } + + def buildAndSendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { +val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpochSupplier.apply()) + .setTopics(new util.ArrayList()) + +inflightAlterIsrItems.groupBy(_.topicPartition.topic).foreach(entry => { + val topicPart = new AlterIsrRequestData.TopicData() +.setName(entry._1) +.setPartitions(new util.ArrayList()) + message.topics().add(topicPart) + entry._2.foreach(item => { +topicPart.partitions().add(new AlterIsrRequestData.PartitionData() + .setPartitionIndex(item.topicPartition.partition) + .setLeaderId(item.leaderAndIsr.leader) + .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) + .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(item.leaderAndIsr.zkVersion) +)
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r489740479 ## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ## @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} +import org.apache.kafka.common.utils.Time + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrManager { + def start(): Unit + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean + + def clearPending(topicPartition: TopicPartition): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr, callback: Either[Errors, LeaderAndIsr] => Unit) + +class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val time: Time, + val brokerId: Int, + val brokerEpochSupplier: () => Long) extends AlterIsrManager with Logging with KafkaMetricsGroup { + + // Used to allow only one pending ISR update per partition + private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]() + + // Used to allow only one in-flight request at a time + private val inflightRequest: AtomicBoolean = new AtomicBoolean(false) + + private val lastIsrPropagationMs = new AtomicLong(0) + + override def start(): Unit = { +scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS) Review comment: Currently we impose a 2.5s delay for the old ZK based ISR propagation method. We could probably increase this 50ms up to a few hundred without any ill-effects. We still benefit from fact that we assume the maximal ISR immediately. How about 200ms? Longer term we can look into having a single thread invocation that sits in a while loop trying to consume from a LinkedBlockingQueue or maybe even a SynchronousQueue. But agreed we should leave this for later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r489730265 ## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ## @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} +import org.apache.kafka.common.utils.Time + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrManager { + def start(): Unit + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean + + def clearPending(topicPartition: TopicPartition): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr, callback: Either[Errors, LeaderAndIsr] => Unit) + +class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val time: Time, + val brokerId: Int, + val brokerEpochSupplier: () => Long) extends AlterIsrManager with Logging with KafkaMetricsGroup { + + // Used to allow only one pending ISR update per partition + private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]() + + // Used to allow only one in-flight request at a time + private val inflightRequest: AtomicBoolean = new AtomicBoolean(false) + + private val lastIsrPropagationMs = new AtomicLong(0) + + override def start(): Unit = { +scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS) + } + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean = { +unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +unsentIsrUpdates.remove(topicPartition) + } + + private def propagateIsrChanges(): Unit = { +if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) { + // Copy current unsent ISRs but don't remove from the map + val inflightAlterIsrItems: mutable.Queue[AlterIsrItem] = new mutable.Queue[AlterIsrItem]() + unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.enqueue(item)) + + val now = time.milliseconds() + lastIsrPropagationMs.set(now) + + buildAndSendRequest(inflightAlterIsrItems.toSeq) +} + } + + def buildAndSendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { +val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpochSupplier.apply()) + .setTopics(new util.ArrayList()) + +inflightAlterIsrItems.groupBy(_.topicPartition.topic).foreach(entry => { + val topicPart = new AlterIsrRequestData.TopicData() +.setName(entry._1) +.setPartitions(new util.ArrayList()) + message.topics().add(topicPart) + entry._2.foreach(item => { +topicPart.partitions().add(new AlterIsrRequestData.PartitionData() + .setPartitionIndex(item.topicPartition.partition) + .setLeaderId(item.leaderAndIsr.leader) + .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) + .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(item.leaderAndIsr.zkVersion) +)
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r489721316 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -617,9 +665,9 @@ class Partition(val topicPartition: TopicPartition, // since the replica's logStartOffset may have incremented val leaderLWIncremented = newLeaderLW > oldLeaderLW -// check if we need to expand ISR to include this replica -// if it is not in the ISR yet -if (!inSyncReplicaIds.contains(followerId)) +// Check if this in-sync replica needs to be added to the ISR. We look at the "maximal" ISR here so we don't Review comment: "maximal" works for me 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r489720971 ## File path: config/log4j.properties ## @@ -76,8 +76,8 @@ log4j.additivity.kafka.request.logger=false log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender log4j.additivity.kafka.network.RequestChannel$=false -log4j.logger.kafka.controller=TRACE, controllerAppender -log4j.additivity.kafka.controller=false +log4j.logger.kafka.controller=DEBUG, controllerAppender Review comment: Yup, my mistake, shouldn't have been committed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r489703810 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -1210,6 +1317,37 @@ class Partition(val topicPartition: TopicPartition, } } + private def sendAlterIsrRequest(newIsr: Set[Int]): Boolean = { +val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.toList, zkVersion) +alterIsrManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, newLeaderAndIsr, result => { + inWriteLock(leaderIsrUpdateLock) { +result match { + case Left(error: Errors) => error match { +case Errors.UNKNOWN_TOPIC_OR_PARTITION => + debug(s"Controller failed to update ISR for $topicPartition since it doesn't know about this partition. Giving up.") +case Errors.FENCED_LEADER_EPOCH => + debug(s"Controller failed to update ISR for $topicPartition since we sent an old leader epoch. Giving up.") +case _ => + pendingInSyncReplicaIds = None Review comment: I think this has been a long-standing bad assumption on my part in this PR. I've been (mis)treating `pendingInSyncReplicaIds` as a mechanism for initiating a retry along with its other semantics. You're right though, explicitly re-sending the ISR is definitely better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r486535868 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition, } } - private def expandIsr(newIsr: Set[Int]): Unit = { -val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.toList, zkVersion) + private[cluster] def expandIsr(newInSyncReplica: Int): Unit = { +if (useAlterIsr) { + expandIsrWithAlterIsr(newInSyncReplica) +} else { + expandIsrWithZk(newInSyncReplica) +} + } + + private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = { +// This is called from maybeExpandIsr which holds the ISR write lock +if (pendingInSyncReplicaIds.isEmpty) { + // When expanding the ISR, we can safely assume the new replica will make it into the ISR since this puts us in + // a more constrained state for advancing the HW. + val newIsr = inSyncReplicaIds + newInSyncReplica + pendingInSyncReplicaIds = Some(newIsr) + debug(s"Adding new in-sync replica $newInSyncReplica. Pending ISR updated to [${newIsr.mkString(",")}] for $topicPartition") + alterIsr(newIsr) +} else { + debug(s"ISR update in-flight, not adding new in-sync replica $newInSyncReplica for $topicPartition") +} + } + + private def expandIsrWithZk(newInSyncReplica: Int): Unit = { +val newInSyncReplicaIds = inSyncReplicaIds + newInSyncReplica +info(s"Expanding ISR from ${inSyncReplicaIds.mkString(",")} to ${newInSyncReplicaIds.mkString(",")}") +val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newInSyncReplicaIds.toList, zkVersion) val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr) -maybeUpdateIsrAndVersion(newIsr, zkVersionOpt) +maybeUpdateIsrAndVersionWithZk(newInSyncReplicaIds, zkVersionOpt) } - private[cluster] def shrinkIsr(newIsr: Set[Int]): Unit = { + private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = { +if (useAlterIsr) { + shrinkIsrWithAlterIsr(outOfSyncReplicas) +} else { + shrinkIsrWithZk(inSyncReplicaIds -- outOfSyncReplicas) +} + } + + private def shrinkIsrWithAlterIsr(outOfSyncReplicas: Set[Int]): Unit = { +// This is called from maybeShrinkIsr which holds the ISR write lock +if (pendingInSyncReplicaIds.isEmpty) { + // When shrinking the ISR, we cannot assume that the update will succeed as this could erroneously advance the HW + // We update pendingInSyncReplicaIds here simply to prevent any further ISR updates from occurring until we get + // the next LeaderAndIsr + pendingInSyncReplicaIds = Some(inSyncReplicaIds) Review comment: How about `uncommittedInSyncReplicaIds`? ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -278,10 +284,10 @@ class KafkaController(val config: KafkaConfig, private def onControllerResignation(): Unit = { debug("Resigning") // de-register listeners - zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path) zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path) zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path) zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path) +zkClient.unregisterStateChangeHandler(isrChangeNotificationHandler.path) Review comment: nope, will revert This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r486535166 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition, } } - private def expandIsr(newIsr: Set[Int]): Unit = { -val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.toList, zkVersion) + private[cluster] def expandIsr(newInSyncReplica: Int): Unit = { +if (useAlterIsr) { + expandIsrWithAlterIsr(newInSyncReplica) +} else { + expandIsrWithZk(newInSyncReplica) +} + } + + private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = { +// This is called from maybeExpandIsr which holds the ISR write lock +if (pendingInSyncReplicaIds.isEmpty) { Review comment: Yea, good idea. I'll leave the check here since we actually acquire and release the lock when checking if we should expand/shrink. It's possible that pendingInSyncReplicaIds is cleared by a LeaderAndIsr before we acquire the write lock to do the update This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r486535166 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition, } } - private def expandIsr(newIsr: Set[Int]): Unit = { -val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.toList, zkVersion) + private[cluster] def expandIsr(newInSyncReplica: Int): Unit = { +if (useAlterIsr) { + expandIsrWithAlterIsr(newInSyncReplica) +} else { + expandIsrWithZk(newInSyncReplica) +} + } + + private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = { +// This is called from maybeExpandIsr which holds the ISR write lock +if (pendingInSyncReplicaIds.isEmpty) { Review comment: Yea, good idea. I'll leave the check here since we actually acquire and release the lock when checking if we should expand/shrink. It's possible that pendingInSyncReplicaIds is cleared by a LeaderAndIsr between then and when we acquire the write lock to do the update This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r486535166 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition, } } - private def expandIsr(newIsr: Set[Int]): Unit = { -val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.toList, zkVersion) + private[cluster] def expandIsr(newInSyncReplica: Int): Unit = { +if (useAlterIsr) { + expandIsrWithAlterIsr(newInSyncReplica) +} else { + expandIsrWithZk(newInSyncReplica) +} + } + + private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = { +// This is called from maybeExpandIsr which holds the ISR write lock +if (pendingInSyncReplicaIds.isEmpty) { Review comment: Yea, good idea. I'll leave the check here since we actually acquire and release the lock when checking if we should expand/shrink. It's possible that pendingInSyncReplicaIds is cleared by an update between then and when we acquire the write lock to do the update This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r486521810 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -255,6 +265,15 @@ class Partition(val topicPartition: TopicPartition, def isAddingReplica(replicaId: Int): Boolean = assignmentState.isAddingReplica(replicaId) + def inSyncReplicaIds(includeUncommittedReplicas: Boolean = false): Set[Int] = { Review comment: I think this sounds good, explict over implicit and all that. If we have two methods like this, should we then make `inSyncReplicaIds` a private member? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r486521810 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -255,6 +265,15 @@ class Partition(val topicPartition: TopicPartition, def isAddingReplica(replicaId: Int): Boolean = assignmentState.isAddingReplica(replicaId) + def inSyncReplicaIds(includeUncommittedReplicas: Boolean = false): Set[Int] = { Review comment: I think this sounds good, explict over implicit and all that. If we create two methods should we then make `inSyncReplicaIds` a private member? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r478069145 ## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ## @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} +import org.apache.kafka.common.utils.Time + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrManager { + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr, callback: Errors => Unit) + +class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val time: Time, + val brokerId: Int) extends AlterIsrManager with Logging with KafkaMetricsGroup { + + private val unsentIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +unsentIsrUpdates synchronized { + unsentIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(time.milliseconds) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { +scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } +} + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +unsentIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + unsentIsrUpdates.remove(topicPartition) +} + } + + private def propagateIsrChanges(): Unit = { +val now = time.milliseconds() +unsentIsrUpdates synchronized { + if (unsentIsrUpdates.nonEmpty) { +val brokerEpoch: Long = zkClient.getBrokerEpoch(brokerId) match { Review comment: I wasn't too happy about this. Is there another way to get the current broker epoch? As I understand it, the broker epoch can change during the lifecycle of a broker. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r478067221 ## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ## @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} +import org.apache.kafka.common.utils.Time + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrManager { + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr, callback: Errors => Unit) + +class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val time: Time, + val brokerId: Int) extends AlterIsrManager with Logging with KafkaMetricsGroup { + + private val unsentIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +unsentIsrUpdates synchronized { + unsentIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(time.milliseconds) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { Review comment: I think that sounds pretty reasonable. Would we need any kind of timeout at this layer, or just rely on the underlying channel to provide timeouts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r475626573 ## File path: config/log4j.properties ## @@ -61,8 +61,8 @@ log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.logger.org.apache.zookeeper=INFO # Change the two lines below to adjust the general broker logging level (output to server.log and stdout) -log4j.logger.kafka=INFO -log4j.logger.org.apache.kafka=INFO +log4j.logger.kafka=DEBUG Review comment: Need to revert this stuff, didn't mean to commit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r475632719 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -1257,4 +1364,4 @@ class Partition(val topicPartition: TopicPartition, } partitionString.toString } -} +} Review comment: newline This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r475625111 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ## @@ -325,7 +326,8 @@ UNSTABLE_OFFSET_COMMIT(88, "There are unstable offsets that need to be cleared.", UnstableOffsetCommitException::new), THROTTLING_QUOTA_EXCEEDED(89, "The throttling quota has been exceeded.", ThrottlingQuotaExceededException::new), PRODUCER_FENCED(90, "There is a newer producer with the same transactionalId " + -"which fences the current one.", ProducerFencedException::new); +"which fences the current one.", ProducerFencedException::new), +INVALID_UPDATE_VERSION(91, "The given ISR version was out-of-date.", InvalidUpdateVersionException::new); Review comment: This error message should be less specific This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r475630695 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -806,8 +840,9 @@ class Partition(val topicPartition: TopicPartition, // avoid unnecessary collection generation var newHighWatermark = leaderLog.logEndOffsetMetadata remoteReplicasMap.values.foreach { replica => +// Note here we are using effectiveInSyncReplicaIds, see explanation above Review comment: Fix comment to refer to correct variable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r475629593 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -635,7 +666,7 @@ class Partition(val topicPartition: TopicPartition, // check if we need to expand ISR to include this replica // if it is not in the ISR yet Review comment: Expand on this comment to discuss the maximal ISR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r475627290 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -141,7 +142,8 @@ object Partition extends KafkaMetricsGroup { stateStore = zkIsrBackingStore, delayedOperations = delayedOperations, metadataCache = replicaManager.metadataCache, - logManager = replicaManager.logManager) + logManager = replicaManager.logManager, + alterIsrChannelManager = replicaManager.alterIsrManager) Review comment: Rename to alterIsrManager This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r474959176 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { +scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } +} + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) +} + } + + override def startup(): Unit = { +controllerChannelManager.start() + } + + override def shutdown(): Unit = { +controllerChannelManager.shutdown() + } + + private def propagateIsrChanges(): Unit = { +val now = System.currentTimeMillis() +pendingIsrUpdates synchronized { + if (pendingIsrUpdates.nonEmpty) { +// Max ISRs to send? +val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch) + .setTopics(new util.ArrayList()) + + pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, items) => { + val topicPart = new AlterIsrRequestTopics() +.setName(topic) +.setPartitions(new util.ArrayList()) + message.topics().add(topicPart) + items.foreach(item => { +topicPart.partitions().add(new AlterIsrRequestPartitions() + .setPartitionIndex(item.topicPartition.partition()) + .setLeaderId(item.leaderAndIsr.leader) + .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) + .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(item.leaderAndIsr.zkVersion) +) + }) +}) + +def responseHandler(response: ClientResponse): Unit = { Review comment: Continued in https://github.com/apache/kafka/pull/9100#discussion_r474934833 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r474934833 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { +scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } +} + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) Review comment: Update: after some discussion and looking over failed system tests, we ended up with the following error handling: * REPLICA_NOT_AVAILABLE and INVALID_REPLICA_ASSIGNMENT will clear the pending ISR to let the leader retry. This covers a case where a leader tries to add a replica to the ISR which is offline because it (the follower) just finished shutdown. * INVALID_UPDATE_VERSION will not clear the pending ISR since the broker has stale metadata. * FENCED_LEADER_EPOCH, NOT_LEADER_OR_FOLLOWER, UNKNOWN_TOPIC_OR_PARTITION will _not_ clear the pending state and therefor will not retry. We presume here that the controller is correct and the leader has old metadata. By not clearing the pending ISR, the leader will await LeaderAndIsr before attempting any further ISR changes * Other unspecified errors: clear the pending state and let the leader retry. Not sure what cases could cause other errors, but it is probably better to be in a retry loop than to be completely stuck This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r474934833 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { +scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } +} + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) Review comment: Update: after some discussion and looking over failed system tests, we ended up with the following error handling: * REPLICA_NOT_AVAILABLE and INVALID_REPLICA_ASSIGNMENT will clear the pending ISR to let the leader retry. This covers a case where a leader tries to add a replica to the ISR which is offline because it (the follower) just finished shutdown. * FENCED_LEADER_EPOCH, NOT_LEADER_OR_FOLLOWER, UNKNOWN_TOPIC_OR_PARTITION will _not_ clear the pending state and therefor will not retry. We presume here that the controller is correct and the leader has old metadata. By not clearing the pending ISR, the leader will await LeaderAndIsr before attempting any further ISR changes * Other unspecified errors: clear the pending state and let the leader retry. Not sure what cases could cause other errors, but it is probably better to be in a retry loop than to be completely stuck This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r474930329 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { +scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } +} + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) +} + } + + override def startup(): Unit = { +controllerChannelManager.start() + } + + override def shutdown(): Unit = { +controllerChannelManager.shutdown() + } + + private def propagateIsrChanges(): Unit = { +val now = System.currentTimeMillis() +pendingIsrUpdates synchronized { + if (pendingIsrUpdates.nonEmpty) { +// Max ISRs to send? +val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch) + .setTopics(new util.ArrayList()) + + pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, items) => { + val topicPart = new AlterIsrRequestTopics() Review comment: It's a little tricky since LeaderAndIsr isn't visible to AlterIsrRequest. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r474106591 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { +scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } +} + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) +} + } + + override def startup(): Unit = { +controllerChannelManager.start() + } + + override def shutdown(): Unit = { +controllerChannelManager.shutdown() + } + + private def propagateIsrChanges(): Unit = { +val now = System.currentTimeMillis() +pendingIsrUpdates synchronized { + if (pendingIsrUpdates.nonEmpty) { +// Max ISRs to send? +val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch) + .setTopics(new util.ArrayList()) + + pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, items) => { + val topicPart = new AlterIsrRequestTopics() +.setName(topic) +.setPartitions(new util.ArrayList()) + message.topics().add(topicPart) + items.foreach(item => { +topicPart.partitions().add(new AlterIsrRequestPartitions() + .setPartitionIndex(item.topicPartition.partition()) + .setLeaderId(item.leaderAndIsr.leader) + .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) + .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(item.leaderAndIsr.zkVersion) +) + }) +}) + +def responseHandler(response: ClientResponse): Unit = { Review comment: I found a race during the system tests when a follower is shutting down. The controller handles the shut down before it handles an AlterIsr. If the proposed ISR includes the now-offline replica, the controller refuses to update that ISR change and returns an error for that partition. It then sends out the current LeaderAndIsr. The problem is that the broker ignores this LeaderAndIsr since it has the same leader epoch. This is easy enou
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r474106591 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { +scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } +} + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) +} + } + + override def startup(): Unit = { +controllerChannelManager.start() + } + + override def shutdown(): Unit = { +controllerChannelManager.shutdown() + } + + private def propagateIsrChanges(): Unit = { +val now = System.currentTimeMillis() +pendingIsrUpdates synchronized { + if (pendingIsrUpdates.nonEmpty) { +// Max ISRs to send? +val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch) + .setTopics(new util.ArrayList()) + + pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, items) => { + val topicPart = new AlterIsrRequestTopics() +.setName(topic) +.setPartitions(new util.ArrayList()) + message.topics().add(topicPart) + items.foreach(item => { +topicPart.partitions().add(new AlterIsrRequestPartitions() + .setPartitionIndex(item.topicPartition.partition()) + .setLeaderId(item.leaderAndIsr.leader) + .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) + .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(item.leaderAndIsr.zkVersion) +) + }) +}) + +def responseHandler(response: ClientResponse): Unit = { Review comment: I found a race during the system tests when a broker is shutting down. The controller handles the shut down before it handles an AlterIsr. If the proposed ISR includes the now-offline replica, the controller refuses to update that ISR change and returns an error for that partition. It then sends out the current LeaderAndIsr. The problem is that the broker ignores this LeaderAndIsr since it has the same leader epoch. This is easy enough
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r474070847 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { +scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } +} + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) +} + } + + override def startup(): Unit = { +controllerChannelManager.start() + } + + override def shutdown(): Unit = { +controllerChannelManager.shutdown() + } + + private def propagateIsrChanges(): Unit = { +val now = System.currentTimeMillis() +pendingIsrUpdates synchronized { + if (pendingIsrUpdates.nonEmpty) { +// Max ISRs to send? +val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch) + .setTopics(new util.ArrayList()) + + pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, items) => { + val topicPart = new AlterIsrRequestTopics() +.setName(topic) +.setPartitions(new util.ArrayList()) + message.topics().add(topicPart) + items.foreach(item => { +topicPart.partitions().add(new AlterIsrRequestPartitions() + .setPartitionIndex(item.topicPartition.partition()) + .setLeaderId(item.leaderAndIsr.leader) + .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) + .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(item.leaderAndIsr.zkVersion) +) + }) +}) + +def responseHandler(response: ClientResponse): Unit = { Review comment: Turns out we do need to clear it. In the common case, the controller will still send out LeaderAndIsr following a failed ISR update. However, during testing I did find a case where a LeaderAndIsr failed to go out leading to a stuck leader. * ISR = {1,2} * Broker 3 comes up * Leader sends ISR = {1,2,3} * Broker 3 goes down * Controller handles broker shutdown * Controller handles AlterIsr, resulting LeaderAndIsr batch fails
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r474070847 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { +scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } +} + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) +} + } + + override def startup(): Unit = { +controllerChannelManager.start() + } + + override def shutdown(): Unit = { +controllerChannelManager.shutdown() + } + + private def propagateIsrChanges(): Unit = { +val now = System.currentTimeMillis() +pendingIsrUpdates synchronized { + if (pendingIsrUpdates.nonEmpty) { +// Max ISRs to send? +val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch) + .setTopics(new util.ArrayList()) + + pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, items) => { + val topicPart = new AlterIsrRequestTopics() +.setName(topic) +.setPartitions(new util.ArrayList()) + message.topics().add(topicPart) + items.foreach(item => { +topicPart.partitions().add(new AlterIsrRequestPartitions() + .setPartitionIndex(item.topicPartition.partition()) + .setLeaderId(item.leaderAndIsr.leader) + .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) + .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(item.leaderAndIsr.zkVersion) +) + }) +}) + +def responseHandler(response: ClientResponse): Unit = { Review comment: Turns out we do need to clear it. In the common case, the controller will still send out LeaderAndIsr following a failed ISR update. However, during testing I did find a case where a LeaderAndIsr failed to go out leading to a stuck leader. * ISR = {1,2} * Broker 3 comes up * Leader sends ISR = {1,2,3} * Broker 3 goes down * Controller handles broker shutdown * Controller handles AlterIsr, resulting LeaderAndIsr batch fails
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r474070847 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { +scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } +} + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) +} + } + + override def startup(): Unit = { +controllerChannelManager.start() + } + + override def shutdown(): Unit = { +controllerChannelManager.shutdown() + } + + private def propagateIsrChanges(): Unit = { +val now = System.currentTimeMillis() +pendingIsrUpdates synchronized { + if (pendingIsrUpdates.nonEmpty) { +// Max ISRs to send? +val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch) + .setTopics(new util.ArrayList()) + + pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, items) => { + val topicPart = new AlterIsrRequestTopics() +.setName(topic) +.setPartitions(new util.ArrayList()) + message.topics().add(topicPart) + items.foreach(item => { +topicPart.partitions().add(new AlterIsrRequestPartitions() + .setPartitionIndex(item.topicPartition.partition()) + .setLeaderId(item.leaderAndIsr.leader) + .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) + .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(item.leaderAndIsr.zkVersion) +) + }) +}) + +def responseHandler(response: ClientResponse): Unit = { Review comment: Turns out we do need to clear it. In the common case, the controller will still send out LeaderAndIsr following a failed ISR update. However, during testing I did find a case where a LeaderAndIsr failed to go out leading to a stuck leader. * ISR = {1,2} * Broker 3 comes up * Leader sends ISR = {1,2,3} * Broker 3 goes down * Controller handles broker shutdown * Controller handles AlterIsr, resulting LeaderAndIsr batch fails
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r470678123 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -255,6 +255,10 @@ class Partition(val topicPartition: TopicPartition, def isAddingReplica(replicaId: Int): Boolean = assignmentState.isAddingReplica(replicaId) + // For advancing the HW we assume the largest ISR even if the controller hasn't made the change yet + // This set includes the latest ISR (as we learned from LeaderAndIsr) and any replicas from a pending ISR expansion + def effectiveInSyncReplicaIds: Set[Int] = inSyncReplicaIds | pendingInSyncReplicaIds Review comment: Since we are now only allowing one in-flight AlterIsr, I changed the semantics of pendingInSyncReplicaIds to be the maximal "effective" ISR. This way we don't need to compute it each time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r470675300 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { +scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } +} + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) Review comment: With the latest changes to prevent multiple in-flight requests, I don't think this should happen for a given partition. Even if it did, the retried in-flight request from BrokerToControllerRequestThread would fail on the controller with an old version. I'm wondering if we even need this clearPending behavior. Since I changed the AlterIsr request to fire at most after 50ms, it's a narrow window between enqueueing an ISR update and receiving a LeaderAndIsr. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r470673412 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { Review comment: Fixed this by adding getBrokerEpoch to KafkaZkClient This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r470063833 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -485,13 +490,11 @@ class Partition(val topicPartition: TopicPartition, def makeLeader(partitionState: LeaderAndIsrPartitionState, Review comment: I don't think AlterIsr changes anything since we're now just sending the async ISR update where we were previously directly updating ZK. Looking at the usages, `updateFollowerFetchState` is only called following a read (`Partition#readRecords`). These reads only happen on fetch requests and from the alter log dirs fetcher. I'm not sure about the alter log dirs flow, but as long as it sends the leader epoch, it should be safe. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r470053173 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -569,8 +575,8 @@ class Partition(val topicPartition: TopicPartition, val oldLeaderEpoch = leaderEpoch // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path - controllerEpoch = partitionState.controllerEpoch + info(s"Follower ignoring ISR for $topicPartition") Review comment: yea, lots of these will be lowered, was just doing this during development This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r469467881 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -1210,28 +1218,20 @@ class Partition(val topicPartition: TopicPartition, } } - private def expandIsr(newIsr: Set[Int]): Unit = { -val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.toList, zkVersion) -val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr) -maybeUpdateIsrAndVersion(newIsr, zkVersionOpt) - } + private def expandIsr(newInSyncReplica: Int): Unit = { +pendingInSyncReplicaIds += newInSyncReplica +info(s"Adding new in-sync replica $newInSyncReplica. Pending ISR updated to [${pendingInSyncReplicaIds.mkString(",")}]") - private[cluster] def shrinkIsr(newIsr: Set[Int]): Unit = { -val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.toList, zkVersion) -val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr) -maybeUpdateIsrAndVersion(newIsr, zkVersionOpt) +val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, pendingInSyncReplicaIds.toList, zkVersion) +alterIsrChannelManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, newLeaderAndIsr)) } - private[cluster] def maybeUpdateIsrAndVersion(isr: Set[Int], zkVersionOpt: Option[Int]): Unit = { -zkVersionOpt match { - case Some(newVersion) => -inSyncReplicaIds = isr -zkVersion = newVersion -info("ISR updated to [%s] and zkVersion updated to [%d]".format(isr.mkString(","), zkVersion)) + private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = { +pendingInSyncReplicaIds --= outOfSyncReplicas Review comment: I'm currently looking at the effective ISR to find new out of sync replicas. This can include new ISR members which haven't made it into the "true" ISR via LeaderAndIsr yet (like broker=3 in your example). Maybe we should only consider removing ISR members iff they are in the true ISR. IOW changing from ```scala val candidateReplicaIds = effectiveInSyncReplicaIds - localBrokerId ``` to ```scala val candidateReplicaIds = inSyncReplicaIds - localBrokerId ``` Also, I wonder if the batching that's happening in AlterIsrChannelManager violates the model. It sends the request asynchronously with a small delay, so multiple ISR changes can be batched into one AlterIsr. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r469460462 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L Review comment: We don't, these were copied over from the ReplicaManager's ISR propagation logic. I'll clean this up This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465750632 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { +scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } +} + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) +} + } + + override def startup(): Unit = { +controllerChannelManager.start() + } + + override def shutdown(): Unit = { +controllerChannelManager.shutdown() + } + + private def propagateIsrChanges(): Unit = { +val now = System.currentTimeMillis() +pendingIsrUpdates synchronized { + if (pendingIsrUpdates.nonEmpty) { +// Max ISRs to send? +val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch) + .setTopics(new util.ArrayList()) + + pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, items) => { + val topicPart = new AlterIsrRequestTopics() +.setName(topic) +.setPartitions(new util.ArrayList()) + message.topics().add(topicPart) + items.foreach(item => { +topicPart.partitions().add(new AlterIsrRequestPartitions() + .setPartitionIndex(item.topicPartition.partition()) + .setLeaderId(item.leaderAndIsr.leader) + .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) + .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(item.leaderAndIsr.zkVersion) +) + }) +}) + +def responseHandler(response: ClientResponse): Unit = { + println(response.responseBody().toString(response.requestHeader().apiVersion())) + val body: AlterIsrResponse = response.responseBody().asInstanceOf[AlterIsrResponse] + val data: AlterIsrResponseData = body.data() + Errors.forCode(data.errorCode()) match { +case Errors.NONE => info(s"Controller handled AlterIsr request") +case e: Errors => warn(s"Controller returned an error when handling AlterIsr request:
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465750445 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { +scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } +} + } + + override def clearPending(topicPartition: TopicPartition): Unit = { +pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) +} + } + + override def startup(): Unit = { +controllerChannelManager.start() + } + + override def shutdown(): Unit = { +controllerChannelManager.shutdown() + } + + private def propagateIsrChanges(): Unit = { +val now = System.currentTimeMillis() +pendingIsrUpdates synchronized { + if (pendingIsrUpdates.nonEmpty) { +// Max ISRs to send? +val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch) + .setTopics(new util.ArrayList()) + + pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, items) => { + val topicPart = new AlterIsrRequestTopics() +.setName(topic) +.setPartitions(new util.ArrayList()) + message.topics().add(topicPart) + items.foreach(item => { +topicPart.partitions().add(new AlterIsrRequestPartitions() + .setPartitionIndex(item.topicPartition.partition()) + .setLeaderId(item.leaderAndIsr.leader) + .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) + .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(item.leaderAndIsr.zkVersion) +) + }) +}) + +def responseHandler(response: ClientResponse): Unit = { + println(response.responseBody().toString(response.requestHeader().apiVersion())) Review comment: Remove this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465749546 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) Review comment: Should probably get rid of this and change the method to `enqueueIsrUpdate(TopicPartition, LeaderAndIsr)` ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) Review comment: Should probably get rid of this and change the method to ```enqueueIsrUpdate(TopicPartition, LeaderAndIsr)``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465749546 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) Review comment: Should probably get rid of this and change the method to ```enqueueIsrUpdate(TopicPartition, LeaderAndIsr)``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465748120 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1771,6 +1776,141 @@ class KafkaController(val config: KafkaConfig, } } + def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: AlterIsrResponseData => Unit): Unit = { +val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]() + +alterIsrRequest.topics().forEach(topicReq => topicReq.partitions().forEach(partitionReq => { + val tp = new TopicPartition(topicReq.name(), partitionReq.partitionIndex()) + val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt) + isrsToAlter.put(tp, new LeaderAndIsr(partitionReq.leaderId(), partitionReq.leaderEpoch(), newIsr, partitionReq.currentIsrVersion())) +})) + +def responseCallback(results: Either[Map[TopicPartition, Errors], Errors]): Unit = { + val resp = new AlterIsrResponseData() + results match { +case Right(error) => + resp.setErrorCode(error.code()) +case Left(partitions: Map[TopicPartition, Errors]) => + resp.setTopics(new util.ArrayList()) + partitions.groupBy(_._1.topic()).foreachEntry((topic, partitionMap) => { +val topicResp = new AlterIsrResponseTopics() + .setName(topic) + .setPartitions(new util.ArrayList()) +resp.topics().add(topicResp) +partitionMap.foreachEntry((partition, error) => { + topicResp.partitions().add( +new AlterIsrResponsePartitions() + .setPartitionIndex(partition.partition()) + .setErrorCode(error.code())) +}) + }) + } + callback.apply(resp) +} + +eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId(), alterIsrRequest.brokerEpoch(), isrsToAlter, responseCallback)) + } + + private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], + callback: AlterIsrCallback): Unit = { +if (!isActive) { + callback.apply(Right(Errors.NOT_CONTROLLER)) + return +} + +val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId) +if (brokerEpochOpt.isEmpty) { + info(s"Ignoring AlterIsr due to unknown broker $brokerId") + callback.apply(Right(Errors.STALE_BROKER_EPOCH)) + return +} + +if (!brokerEpochOpt.contains(brokerEpoch)) { + info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for broker $brokerId") + callback.apply(Right(Errors.STALE_BROKER_EPOCH)) + return +} + +val partitionErrors: mutable.Map[TopicPartition, Errors] = mutable.HashMap[TopicPartition, Errors]() + +val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = isrsToAlter.flatMap { + case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) => +val partitionError: Errors = controllerContext.partitionLeadershipInfo(tp) match { + case Some(leaderIsrAndControllerEpoch) => +val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr +if (newLeaderAndIsr.leader != currentLeaderAndIsr.leader) { + Errors.NOT_LEADER_OR_FOLLOWER +} else if (newLeaderAndIsr.leaderEpoch < currentLeaderAndIsr.leaderEpoch) { + Errors.FENCED_LEADER_EPOCH +} else { + val currentAssignment = controllerContext.partitionReplicaAssignment(tp) + if (!newLeaderAndIsr.isr.forall(replicaId => currentAssignment.contains(replicaId))) { +warn(s"Some of the proposed ISR are not in the assignment for partition $tp. Proposed ISR=$newLeaderAndIsr.isr assignment=$currentAssignment") +Errors.INVALID_REQUEST + } else if (!newLeaderAndIsr.isr.forall(replicaId => controllerContext.isReplicaOnline(replicaId, tp))) { +warn(s"Some of the proposed ISR are offline for partition $tp. Proposed ISR=$newLeaderAndIsr.isr") +Errors.INVALID_REQUEST + } else { +Errors.NONE + } +} + case None => Errors.UNKNOWN_TOPIC_OR_PARTITION +} +if (partitionError == Errors.NONE) { + // Bump the leaderEpoch for partitions that we're going to write + Some(tp -> newLeaderAndIsr.newEpochAndZkVersion) +} else { + partitionErrors.put(tp, partitionError) + None +} +} + +// Do the updates in ZK +info(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.") +val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) = zkClient.updateLeaderAndIsr( + adjustedIsrs, controllerContext.epoch, controllerContext.epochZkVersion) + +val successfulUpdates: Map[TopicPartition, LeaderAndIsr] = finishedUpdates.flatMap { + case (partition
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465748504 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,121 @@ +package kafka.server Review comment: Yea, maybe just "AlterIsrManager"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465746162 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1771,6 +1776,141 @@ class KafkaController(val config: KafkaConfig, } } + def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: AlterIsrResponseData => Unit): Unit = { +val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]() + +alterIsrRequest.topics().forEach(topicReq => topicReq.partitions().forEach(partitionReq => { Review comment: It might be simpler just to use AlterIsrRequestData and AlterIsrResponseData throughout this code (rather than converting to `Map[TopicPartition, LeaderAndIsr]` and `Map[TopicPartition, Errors]`) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465743922 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig, } } + // TODO is it okay to pull message classes down into the controller? + def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: AlterIsrResponseData => Unit): Unit = { +//val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId()) +/*if (brokerEpochOpt.isEmpty) { + info(s"Ignoring AlterIsr due to unknown broker ${alterIsrRequest.brokerId()}") + // TODO is INVALID_REQUEST a reasonable error here? + callback.apply(new AlterIsrResponseData().setErrorCode(Errors.INVALID_REQUEST.code)) + return +} + +if (!brokerEpochOpt.contains(alterIsrRequest.brokerEpoch())) { + info(s"Ignoring AlterIsr due to stale broker epoch ${alterIsrRequest.brokerEpoch()} for broker ${alterIsrRequest.brokerId()}") + callback.apply(new AlterIsrResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)) + return +}*/ + +val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]() + +val resp = new AlterIsrResponseData() +resp.setTopics(new util.ArrayList()) + +alterIsrRequest.topics().forEach(topicReq => { + val topicResp = new AlterIsrResponseTopics() +.setName(topicReq.name()) +.setPartitions(new util.ArrayList()) + resp.topics().add(topicResp) + + topicReq.partitions().forEach(partitionReq => { +val partitionResp = new AlterIsrResponsePartitions() + .setPartitionIndex(partitionReq.partitionIndex()) +topicResp.partitions().add(partitionResp) + +// For each partition who's ISR we are altering, let's do some upfront validation for the broker response Review comment: @cmccabe good to know about `controllerContext` 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r465029352 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig, } } + // TODO is it okay to pull message classes down into the controller? + def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: AlterIsrResponseData => Unit): Unit = { +//val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId()) +/*if (brokerEpochOpt.isEmpty) { + info(s"Ignoring AlterIsr due to unknown broker ${alterIsrRequest.brokerId()}") + // TODO is INVALID_REQUEST a reasonable error here? + callback.apply(new AlterIsrResponseData().setErrorCode(Errors.INVALID_REQUEST.code)) + return +} + +if (!brokerEpochOpt.contains(alterIsrRequest.brokerEpoch())) { + info(s"Ignoring AlterIsr due to stale broker epoch ${alterIsrRequest.brokerEpoch()} for broker ${alterIsrRequest.brokerId()}") + callback.apply(new AlterIsrResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)) + return +}*/ + +val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]() + +val resp = new AlterIsrResponseData() +resp.setTopics(new util.ArrayList()) + +alterIsrRequest.topics().forEach(topicReq => { + val topicResp = new AlterIsrResponseTopics() +.setName(topicReq.name()) +.setPartitions(new util.ArrayList()) + resp.topics().add(topicResp) + + topicReq.partitions().forEach(partitionReq => { +val partitionResp = new AlterIsrResponsePartitions() + .setPartitionIndex(partitionReq.partitionIndex()) +topicResp.partitions().add(partitionResp) + +// For each partition who's ISR we are altering, let's do some upfront validation for the broker response Review comment: Ah, I see what you mean. Initially, I was concerned about blocking for too long while waiting for a response, but it looks like there is precedent for this pattern for some requests (reassignment, leader election, controlled shutdown). I'll move this validation and the callback down into the event processor method This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r464717171 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig, } } + // TODO is it okay to pull message classes down into the controller? + def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: AlterIsrResponseData => Unit): Unit = { +//val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId()) +/*if (brokerEpochOpt.isEmpty) { + info(s"Ignoring AlterIsr due to unknown broker ${alterIsrRequest.brokerId()}") + // TODO is INVALID_REQUEST a reasonable error here? + callback.apply(new AlterIsrResponseData().setErrorCode(Errors.INVALID_REQUEST.code)) + return +} + +if (!brokerEpochOpt.contains(alterIsrRequest.brokerEpoch())) { + info(s"Ignoring AlterIsr due to stale broker epoch ${alterIsrRequest.brokerEpoch()} for broker ${alterIsrRequest.brokerId()}") + callback.apply(new AlterIsrResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)) + return +}*/ + +val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]() + +val resp = new AlterIsrResponseData() +resp.setTopics(new util.ArrayList()) + +alterIsrRequest.topics().forEach(topicReq => { + val topicResp = new AlterIsrResponseTopics() +.setName(topicReq.name()) +.setPartitions(new util.ArrayList()) + resp.topics().add(topicResp) + + topicReq.partitions().forEach(partitionReq => { +val partitionResp = new AlterIsrResponsePartitions() + .setPartitionIndex(partitionReq.partitionIndex()) +topicResp.partitions().add(partitionResp) + +// For each partition who's ISR we are altering, let's do some upfront validation for the broker response Review comment: The main rationale for validating in the request handler is so we can return meaningful partition-level errors to the broker (fenced leader, not leader or follower, etc). Although, I'm not sure the broker could do anything useful with these errors since it probably has stale metadata in these cases. The KIP calls out four partition-level errors. Do we actually need them? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r464716040 ## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ## @@ -0,0 +1,121 @@ +package kafka.server + +import java.util +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 6L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr, callback: Errors => Unit) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Queue[AlterIsrItem] = new mutable.Queue[AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { +pendingIsrUpdates synchronized { + pendingIsrUpdates += alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) +} + } + + override def startup(): Unit = { +scheduler.schedule("alter-isr-send", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS) Review comment: That makes sense. I'll change that (this was pulled in from the previous ISR notification code in ReplicaManager) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org