hachikuji commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r554105440



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -122,43 +121,47 @@ class DefaultAlterIsrManager(
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, 
TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == 
null
+    if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, 
alterIsrItem) == null) {
+      if (inflightRequest.compareAndSet(false, true)) {
+        // optimistically set the inflight flag even though we haven't sent 
the request yet
+        scheduler.schedule("send-alter-isr", propagateIsrChanges, 1, -1, 
TimeUnit.MILLISECONDS)

Review comment:
       If we are only waiting 1ms, would it be simpler to call 
`propagateIsrChanges` directly? Similarly after receiving a response with no 
error.

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -122,43 +121,47 @@ class DefaultAlterIsrManager(
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, 
TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == 
null
+    if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, 
alterIsrItem) == null) {
+      if (inflightRequest.compareAndSet(false, true)) {

Review comment:
       This seems to introduce a race condition. Say we have an inflight 
request. Is the following sequence possible?
   
   1. the response returns and the io thread calls `propagateIsrChanges` and 
sees an empty `unsentIsrUpdates`
   2. request thread calls submit and inserts a new item in `unsentIsrUpdates`
   3. request thread fails `compareAndSet` on `inflightRequest`
   4. io thread clears `inflightRequest`
   
   It seems like we might need a lock.

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -116,63 +116,78 @@ class DefaultAlterIsrManager(
   private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new 
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
 
   // Used to allow only one in-flight request at a time
-  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+  @volatile
+  private var inflightRequest: Boolean = false
 
-  private val lastIsrPropagationMs = new AtomicLong(0)
+  // Protects the updates of the inflight flag and prevents new pending items 
from being submitted while we are
+  // preparing a request
+  private val inflightLock: ReentrantReadWriteLock = new 
ReentrantReadWriteLock()
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, 
TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == 
null
+    val (didSubmit, needsPropagate) = inReadLock(inflightLock) {
+      if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, 
alterIsrItem) == null) {
+        (true, !inflightRequest)
+      } else {
+        (false, false)
+      }
+    }
+    if (needsPropagate) {
+      propagateIsrChanges(true)
+    }
+    didSubmit
   }
 
   override def clearPending(topicPartition: TopicPartition): Unit = {
     unsentIsrUpdates.remove(topicPartition)
   }
 
-  private def propagateIsrChanges(): Unit = {
-    if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, 
true)) {
-      // Copy current unsent ISRs but don't remove from the map
-      val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
-      unsentIsrUpdates.values().forEach(item => 
inflightAlterIsrItems.append(item))
-
-      val now = time.milliseconds()
-      lastIsrPropagationMs.set(now)
-      sendRequest(inflightAlterIsrItems.toSeq)
+  private def propagateIsrChanges(checkInflight: Boolean): Unit = 
inWriteLock(inflightLock) {

Review comment:
       Do you think contention for this lock will be an issue? It seems 
unlikely to me. That makes me think we might be able to simplify the 
concurrency if we replace the read-write lock with a simple ReentrantLock. Then 
we could design this around two methods
   
   ```scala
   def maybePropagateIsrChanges(): Unit = lock synchronized {
    if (!inflightRequest) {
      ... 
    }
   }
   
   def clearInFlightRequest(): Unit = lock synchronized {
     inFlightRequest = false
   }
   ```
   What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to