mumrah commented on a change in pull request #9749: URL: https://github.com/apache/kafka/pull/9749#discussion_r542714371
########## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ########## @@ -62,27 +61,38 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne private val lastIsrPropagationMs = new AtomicLong(0) - override def start(): Unit = { - scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS) - } + override def start(): Unit = { } override def enqueue(alterIsrItem: AlterIsrItem): Boolean = { - unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null + if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) { + if (inflightRequest.compareAndSet(false, true)) { + // optimistically set the inflight flag even though we haven't sent the request yet + scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS) Review comment: The 50ms delay here acts like a simplistic linger time to allow ISR updates to batch. If we schedule the thread immediately after the first call to `enqueue`, we'll likely only send a single partition in the AlterIsr request. With the small delay, we give partitions a chance to accumulate for sending as one batch. We might consider adding a real linger time with maximum delay if we find that we're sending inefficient batches in practice. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org