mumrah commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r542714371



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -62,27 +61,38 @@ class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChanne
 
   private val lastIsrPropagationMs = new AtomicLong(0)
 
-  override def start(): Unit = {
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, 
TimeUnit.MILLISECONDS)
-  }
+  override def start(): Unit = { }
 
   override def enqueue(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == 
null
+    if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, 
alterIsrItem) == null) {
+      if (inflightRequest.compareAndSet(false, true)) {
+        // optimistically set the inflight flag even though we haven't sent 
the request yet
+        scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, -1, 
TimeUnit.MILLISECONDS)

Review comment:
       The 50ms delay here acts like a simplistic linger time to allow ISR 
updates to batch. If we schedule the thread immediately after the first call to 
`enqueue`, we'll likely only send a single partition in the AlterIsr request. 
With the small delay, we give partitions a chance to accumulate for sending as 
one batch.
   
   We might consider adding a real linger time with maximum delay if we find 
that we're sending inefficient batches in practice.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to