hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r464705276



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2956,6 +2956,22 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
+    val alterIsrRequest = request.body[AlterIsrRequest]
+
+    authorizeClusterOperation(request, CLUSTER_ACTION);
+
+    // TODO do we need throttling for this response?

Review comment:
       Probably reasonable to handle it the same way other inter-broker RPCs 
are handled.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  // TODO is it okay to pull message classes down into the controller?
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    //val brokerEpochOpt = 
controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId())
+    /*if (brokerEpochOpt.isEmpty) {
+      info(s"Ignoring AlterIsr due to unknown broker 
${alterIsrRequest.brokerId()}")
+      // TODO is INVALID_REQUEST a reasonable error here?

Review comment:
       Good question. Might be fair to assume the controller is correct and use 
STALE_BROKER_EPOCH. Once kip-500 is all done, it would be totally fair since 
the controller will be guaranteed to have the latest state. The other question 
is what the broker should do if it sees STALE_BROKER_EPOCH...

##########
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##########
@@ -0,0 +1,121 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 60000L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Errors => Unit)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+                                 val zkClient: KafkaZkClient,
+                                 val scheduler: Scheduler,
+                                 val brokerId: Int,
+                                 val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Queue[AlterIsrItem] = new 
mutable.Queue[AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+    pendingIsrUpdates synchronized {
+      pendingIsrUpdates += alterIsrItem
+      lastIsrChangeMs.set(System.currentTimeMillis())
+    }
+  }
+
+  override def startup(): Unit = {
+    scheduler.schedule("alter-isr-send", maybePropagateIsrChanges _, period = 
2500L, unit = TimeUnit.MILLISECONDS)

Review comment:
       Hmm.. This adds a delay of 2.5s to every ISR change, which is a bit 
annoying. I guess the point is to allow batching? I think a better approach 
might be to send requests immediately on arrival, but set a limit on the 
maximum number of in-flight requests (maybe just 1) and let the changes 
accumulate when there is a request in-flight. Then we can still get a big 
batching benefit when there are a large number of ISR changes that need to be 
sent in a hurry.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  // TODO is it okay to pull message classes down into the controller?
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    //val brokerEpochOpt = 
controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId())
+    /*if (brokerEpochOpt.isEmpty) {
+      info(s"Ignoring AlterIsr due to unknown broker 
${alterIsrRequest.brokerId()}")
+      // TODO is INVALID_REQUEST a reasonable error here?
+      callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.INVALID_REQUEST.code))
+      return
+    }
+
+    if (!brokerEpochOpt.contains(alterIsrRequest.brokerEpoch())) {
+      info(s"Ignoring AlterIsr due to stale broker epoch 
${alterIsrRequest.brokerEpoch()} for broker ${alterIsrRequest.brokerId()}")
+      callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
+      return
+    }*/
+
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    val resp = new AlterIsrResponseData()
+    resp.setTopics(new util.ArrayList())
+
+    alterIsrRequest.topics().forEach(topicReq => {
+      val topicResp = new AlterIsrResponseTopics()
+        .setName(topicReq.name())
+        .setPartitions(new util.ArrayList())
+      resp.topics().add(topicResp)
+
+      topicReq.partitions().forEach(partitionReq => {
+        val partitionResp = new AlterIsrResponsePartitions()
+          .setPartitionIndex(partitionReq.partitionIndex())
+        topicResp.partitions().add(partitionResp)
+
+        // For each partition who's ISR we are altering, let's do some upfront 
validation for the broker response

Review comment:
       Hmm.. Not sure it's worth doing these validations up-front. These checks 
could fail between the time that the event is enqueued and the time it is 
processed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to