hachikuji commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r543057645



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -62,27 +61,38 @@ class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChanne
 
   private val lastIsrPropagationMs = new AtomicLong(0)
 
-  override def start(): Unit = {
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, 
TimeUnit.MILLISECONDS)
-  }
+  override def start(): Unit = { }
 
   override def enqueue(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == 
null
+    if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, 
alterIsrItem) == null) {
+      if (inflightRequest.compareAndSet(false, true)) {
+        // optimistically set the inflight flag even though we haven't sent 
the request yet
+        scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, -1, 
TimeUnit.MILLISECONDS)

Review comment:
       The linger attempt feels a tad clumsy. I wonder if it is necessary. With 
the constraint of only having one inflight request, I think we will end up 
batching effectively when the rate of requests goes up. Most importantly, we'll 
be able to batch effectively when the controller is slow to respond. Would this 
be complex to implement?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to