mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r474106591
########## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ########## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 60000L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { + pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { + scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } + } + } + + override def clearPending(topicPartition: TopicPartition): Unit = { + pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) + } + } + + override def startup(): Unit = { + controllerChannelManager.start() + } + + override def shutdown(): Unit = { + controllerChannelManager.shutdown() + } + + private def propagateIsrChanges(): Unit = { + val now = System.currentTimeMillis() + pendingIsrUpdates synchronized { + if (pendingIsrUpdates.nonEmpty) { + // Max ISRs to send? + val message = new AlterIsrRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch) + .setTopics(new util.ArrayList()) + + pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, items) => { + val topicPart = new AlterIsrRequestTopics() + .setName(topic) + .setPartitions(new util.ArrayList()) + message.topics().add(topicPart) + items.foreach(item => { + topicPart.partitions().add(new AlterIsrRequestPartitions() + .setPartitionIndex(item.topicPartition.partition()) + .setLeaderId(item.leaderAndIsr.leader) + .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) + .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(item.leaderAndIsr.zkVersion) + ) + }) + }) + + def responseHandler(response: ClientResponse): Unit = { Review comment: I found a race during the system tests when a broker is shutting down. The controller handles the shut down before it handles an AlterIsr. If the proposed ISR includes the now-offline replica, the controller refuses to update that ISR change and returns an error for that partition. It then sends out the current LeaderAndIsr. The problem is that the broker ignores this LeaderAndIsr since it has the same leader epoch. This is easy enough to fix, we can bump the leader epoch in the controller (and ZK) before sending it out. However, there's still the case of failing to update ZK. I think we should probably treat this the same way as an offline replica. If we simply return an error in AlterIsr response and let the leader reset the pending ISR state, the leader will just retry with stale metadata and the update will fail again. I think in all these error cases we must bump the leader epoch to force the leader to accept the new LeaderAndIsr. Thoughts? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org