mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r474070847



##########
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##########
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 60000L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+                                 val zkClient: KafkaZkClient,
+                                 val scheduler: Scheduler,
+                                 val brokerId: Int,
+                                 val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+    pendingIsrUpdates synchronized {
+      pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+      lastIsrChangeMs.set(System.currentTimeMillis())
+      // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+      // in fast succession
+      if (scheduledRequest.isEmpty) {
+        scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+      }
+    }
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+    pendingIsrUpdates synchronized {
+      // when we get a new LeaderAndIsr, we clear out any pending requests
+      pendingIsrUpdates.remove(topicPartition)
+    }
+  }
+
+  override def startup(): Unit = {
+    controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+    controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+    val now = System.currentTimeMillis()
+    pendingIsrUpdates synchronized {
+      if (pendingIsrUpdates.nonEmpty) {
+        // Max ISRs to send?
+        val message = new AlterIsrRequestData()
+          .setBrokerId(brokerId)
+          .setBrokerEpoch(brokerEpoch)
+          .setTopics(new util.ArrayList())
+
+        
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+          val topicPart = new AlterIsrRequestTopics()
+            .setName(topic)
+            .setPartitions(new util.ArrayList())
+          message.topics().add(topicPart)
+          items.foreach(item => {
+            topicPart.partitions().add(new AlterIsrRequestPartitions()
+              .setPartitionIndex(item.topicPartition.partition())
+              .setLeaderId(item.leaderAndIsr.leader)
+              .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+              .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+              .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+            )
+          })
+        })
+
+        def responseHandler(response: ClientResponse): Unit = {

Review comment:
       Turns out we do need to clear it. In the common case, the controller 
will still send out LeaderAndIsr following a failed ISR update. However, during 
testing I did find a case where a LeaderAndIsr failed to go out leading to a 
stuck leader.
   
   * ISR = {1,2}
   * Broker 3 comes up
   * Leader sends ISR = {1,2,3}
   * Broker 3 goes down
   * Controller handles broker shutdown
   * Controller handles AlterIsr, resulting LeaderAndIsr batch fails to send 
because broker 3 is offline
   * Broker 3 comes up, leader still waiting on LeaderAndIsr and never get is 
-- ISR gets stuck
   
    




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to