mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r474106591



##########
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##########
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 60000L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+                                 val zkClient: KafkaZkClient,
+                                 val scheduler: Scheduler,
+                                 val brokerId: Int,
+                                 val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+    pendingIsrUpdates synchronized {
+      pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+      lastIsrChangeMs.set(System.currentTimeMillis())
+      // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+      // in fast succession
+      if (scheduledRequest.isEmpty) {
+        scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+      }
+    }
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+    pendingIsrUpdates synchronized {
+      // when we get a new LeaderAndIsr, we clear out any pending requests
+      pendingIsrUpdates.remove(topicPartition)
+    }
+  }
+
+  override def startup(): Unit = {
+    controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+    controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+    val now = System.currentTimeMillis()
+    pendingIsrUpdates synchronized {
+      if (pendingIsrUpdates.nonEmpty) {
+        // Max ISRs to send?
+        val message = new AlterIsrRequestData()
+          .setBrokerId(brokerId)
+          .setBrokerEpoch(brokerEpoch)
+          .setTopics(new util.ArrayList())
+
+        
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+          val topicPart = new AlterIsrRequestTopics()
+            .setName(topic)
+            .setPartitions(new util.ArrayList())
+          message.topics().add(topicPart)
+          items.foreach(item => {
+            topicPart.partitions().add(new AlterIsrRequestPartitions()
+              .setPartitionIndex(item.topicPartition.partition())
+              .setLeaderId(item.leaderAndIsr.leader)
+              .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+              .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+              .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+            )
+          })
+        })
+
+        def responseHandler(response: ClientResponse): Unit = {

Review comment:
       I found a race during the system tests when a broker is shutting down. 
The controller handles the shut down before it handles an AlterIsr. If the 
proposed ISR includes the now-offline replica, the controller refuses to update 
that ISR change and returns an error for that partition. It then sends out the 
current LeaderAndIsr.
   
   The problem is that the broker ignores this LeaderAndIsr since it has the 
same leader epoch. This is easy enough to fix, we can bump the leader epoch in 
the controller (and ZK) before sending it out.
   
   However, there's still the case of failing to update ZK. I think we should 
probably treat this the same way as an offline replica. If we simply return an 
error in AlterIsr response and let the leader reset the pending ISR state, the 
leader will just retry with stale metadata and the update will fail again. 
   
   I think in all these error cases we must bump the leader epoch to force the 
leader to accept the new LeaderAndIsr. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to