hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r469388095



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -748,7 +755,7 @@ class Partition(val topicPartition: TopicPartition,
     leaderLogIfLocal match {
       case Some(leaderLog) =>
         // keep the current immutable replica list reference
-        val curInSyncReplicaIds = inSyncReplicaIds
+        val curInSyncReplicaIds = effectiveInSyncReplicaIds

Review comment:
       Related to the other comment, but we need to be careful with the min.isr 
check below. I think it is correct to wait for `effectiveInSyncReplicaIds` 
before acknowledging the produce request, but we should probably use the size 
of `inSyncReplicaIds` in the min.isr check since that is the only set we can 
guarantee.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterIsrRequest.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class AlterIsrRequest extends AbstractRequest {
+
+    private final AlterIsrRequestData data;
+
+    public AlterIsrRequest(AlterIsrRequestData data, short apiVersion) {
+        super(ApiKeys.ALTER_ISR, apiVersion);
+        this.data = data;
+    }
+
+    public AlterIsrRequestData data() {
+        return data;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        return data.toStruct(version());
+    }
+
+    /**
+     * Get an error response for a request with specified throttle time in the 
response if applicable
+     *
+     * @param throttleTimeMs

Review comment:
       nit: maybe drop the parameters if they do not need to be documented

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -255,6 +255,10 @@ class Partition(val topicPartition: TopicPartition,
 
   def isAddingReplica(replicaId: Int): Boolean = 
assignmentState.isAddingReplica(replicaId)
 
+  // For advancing the HW we assume the largest ISR even if the controller 
hasn't made the change yet
+  // This set includes the latest ISR (as we learned from LeaderAndIsr) and 
any replicas from a pending ISR expansion
+  def effectiveInSyncReplicaIds: Set[Int] = inSyncReplicaIds | 
pendingInSyncReplicaIds

Review comment:
       We might need to be careful about performance here since this would get 
called on every follower fetch.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -485,13 +490,11 @@ class Partition(val topicPartition: TopicPartition,
   def makeLeader(partitionState: LeaderAndIsrPartitionState,

Review comment:
       There is a "classic" edge case in Kafka which goes as follows:
   
   1. Leader is 1, ISR is [1, 2, 3]
   2. Broker 3 begins controlled shutdown. While awaiting shutdown, it 
continues fetching.
   3. Controller bumps epoch and shrinks ISR to [1, 2] and notifies replicas
   4. Before controlled shutdown completes and 3 stops fetching, the leader 
adds it back to the ISR.
   
   This bug was fixed by KIP-320 which added epoch validation to the Fetch API. 
After shrinking the ISR in step 3, the controller will send `LeaderAndIsr` with 
the updated epoch to [1, 2] and `StopReplica` to [3]. So 3 will not send any 
fetches with the updated epoch, which means it's impossible for the leader to 
add 3 back after observing the shrink to [1, 2]. 
   
   I just want to make sure whether above is correct and whether `AlterIsr` 
changes it in any way. I think the answer is no as long as ISR expansion is 
_only_ done in response to a fetch request, but it's worth double-checking.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1756,6 +1761,141 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics().forEach(topicReq => 
topicReq.partitions().forEach(partitionReq => {
+      val tp = new TopicPartition(topicReq.name(), 
partitionReq.partitionIndex())
+      val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+      isrsToAlter.put(tp, new LeaderAndIsr(partitionReq.leaderId(), 
partitionReq.leaderEpoch(), newIsr, partitionReq.currentIsrVersion()))
+    }))
+
+    def responseCallback(results: Either[Map[TopicPartition, Errors], 
Errors]): Unit = {
+      val resp = new AlterIsrResponseData()
+      results match {
+        case Right(error) =>
+          resp.setErrorCode(error.code())

Review comment:
       nit: remove parenthesis for simpler getters like `code`. A few more of 
these

##########
File path: 
clients/src/main/java/org/apache/kafka/common/errors/InvalidIsrVersionException.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class InvalidIsrVersionException extends ApiException {

Review comment:
       With KIP-500, I imagine we could end up with other cases where we end up 
using optimistic concurrency control. Does it make sense to make this error a 
little more generic? Maybe `INVALID_UPDATE_VERSION` or something like that..

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -990,6 +997,7 @@ class Partition(val topicPartition: TopicPartition,
       leaderLogIfLocal match {
         case Some(leaderLog) =>
           val minIsr = leaderLog.config.minInSyncReplicas
+          // TODO is it safe to use pending ISR here?

Review comment:
       I think the answer is no. The pending ISR set is not guaranteed, so we 
cannot depend on it to enforce min.isr.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -691,6 +700,7 @@ class Partition(val topicPartition: TopicPartition,
     else
       assignmentState = SimpleAssignmentState(assignment)
     inSyncReplicaIds = isr
+    pendingInSyncReplicaIds = isr

Review comment:
       The usage is a bit surprising given the "pending" name. I wonder if it 
would be clearer if we used a type of `Option[Set[Int]]` so that we could use 
`None` when there is no pending ISR change.
   
   One more thing. It's worth double-checking the threading assumptions here. 
It looks like `updateAssignmentAndIsr` is only called while holding the write 
side of `leaderIsrUpdateLock`. On the other hand, I don't see any lock held in 
`updateFollowerFetchState`. It's worth stepping through that logic to make sure 
that we do not depend on `inSyncReplicaIds` and `pendingInSyncReplicaIds` 
getting set atomically.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -569,8 +575,8 @@ class Partition(val topicPartition: TopicPartition,
       val oldLeaderEpoch = leaderEpoch
       // record the epoch of the controller that made the leadership decision. 
This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper 
path
-      controllerEpoch = partitionState.controllerEpoch
 
+      info(s"Follower ignoring ISR for $topicPartition")

Review comment:
       nit: info feels a bit high for a message like this

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -807,7 +814,7 @@ class Partition(val topicPartition: TopicPartition,
       var newHighWatermark = leaderLog.logEndOffsetMetadata
       remoteReplicasMap.values.foreach { replica =>
         if (replica.logEndOffsetMetadata.messageOffset < 
newHighWatermark.messageOffset &&
-          (curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || 
inSyncReplicaIds.contains(replica.brokerId))) {
+          (curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || 
effectiveInSyncReplicaIds.contains(replica.brokerId))) {

Review comment:
       I think it's worth adding a comment in the cases we rely on 
`effectiveInSyncReplicaIds` to explain why.

##########
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##########
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L

Review comment:
       nit: avoid loaded terminology like "blackout" (see 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Use+racially+neutral+terms+in+our+codebase).
 Do we actually need this or `IsrChangePropagationInterval` below?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1210,28 +1218,20 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def expandIsr(newIsr: Set[Int]): Unit = {
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-    val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
-    maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
-  }
+  private def expandIsr(newInSyncReplica: Int): Unit = {
+    pendingInSyncReplicaIds += newInSyncReplica
+    info(s"Adding new in-sync replica $newInSyncReplica. Pending ISR updated 
to [${pendingInSyncReplicaIds.mkString(",")}]")
 
-  private[cluster] def shrinkIsr(newIsr: Set[Int]): Unit = {
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-    val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
-    maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
pendingInSyncReplicaIds.toList, zkVersion)
+    alterIsrChannelManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, 
newLeaderAndIsr))
   }
 
-  private[cluster] def maybeUpdateIsrAndVersion(isr: Set[Int], zkVersionOpt: 
Option[Int]): Unit = {
-    zkVersionOpt match {
-      case Some(newVersion) =>
-        inSyncReplicaIds = isr
-        zkVersion = newVersion
-        info("ISR updated to [%s] and zkVersion updated to 
[%d]".format(isr.mkString(","), zkVersion))
+  private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
+    pendingInSyncReplicaIds --= outOfSyncReplicas

Review comment:
       I think the implementation here is actually different than what was in 
the model. Consider the following case:
   
   1) Initial state: isr=[1, 2], pendingIsr=[1, 2]
   2) Leader expands ISR. isr=[1, 2], pendingIsr=[1, 2, 3]
   3) Leader shrinks ISR. isr=[1, 2], pendingIsr=[1, 2]
   
   We don't know which of the updates in 2) or 3) will be accepted, but after 
3), we will not assume that broker 3 could be in the ISR, which could lead to a 
correctness violation if the update in 2) is accepted by the controller.
   
   In the model, we always assumed the maximal ISR across _any_ potential 
update to protect from this edge case. Maybe in the end it is simpler to not 
allow multiple in-flight updates.
   

##########
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##########
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 60000L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+                                 val zkClient: KafkaZkClient,
+                                 val scheduler: Scheduler,
+                                 val brokerId: Int,
+                                 val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+    pendingIsrUpdates synchronized {
+      pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+      lastIsrChangeMs.set(System.currentTimeMillis())

Review comment:
       We should use `Time`

##########
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##########
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 60000L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+                                 val zkClient: KafkaZkClient,
+                                 val scheduler: Scheduler,
+                                 val brokerId: Int,
+                                 val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+    pendingIsrUpdates synchronized {
+      pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+      lastIsrChangeMs.set(System.currentTimeMillis())
+      // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+      // in fast succession
+      if (scheduledRequest.isEmpty) {
+        scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+      }
+    }
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+    pendingIsrUpdates synchronized {
+      // when we get a new LeaderAndIsr, we clear out any pending requests
+      pendingIsrUpdates.remove(topicPartition)
+    }
+  }
+
+  override def startup(): Unit = {
+    controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+    controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+    val now = System.currentTimeMillis()
+    pendingIsrUpdates synchronized {
+      if (pendingIsrUpdates.nonEmpty) {
+        // Max ISRs to send?
+        val message = new AlterIsrRequestData()
+          .setBrokerId(brokerId)
+          .setBrokerEpoch(brokerEpoch)
+          .setTopics(new util.ArrayList())
+
+        
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+          val topicPart = new AlterIsrRequestTopics()
+            .setName(topic)
+            .setPartitions(new util.ArrayList())
+          message.topics().add(topicPart)
+          items.foreach(item => {
+            topicPart.partitions().add(new AlterIsrRequestPartitions()
+              .setPartitionIndex(item.topicPartition.partition())
+              .setLeaderId(item.leaderAndIsr.leader)
+              .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+              .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+              .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+            )
+          })
+        })
+
+        def responseHandler(response: ClientResponse): Unit = {

Review comment:
       I think the basic approach here is to ignore successful responses and 
wait for the `LeaderAndIsr` update. I am wondering how we should handle the 
case when the update failed. Say for example that our update fails with the 
INVALID_VERSION error. Inside `Partition`, we will still have the pendingIsr 
set. Do we need to clear it? How about other errors?

##########
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##########
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 60000L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+                                 val zkClient: KafkaZkClient,
+                                 val scheduler: Scheduler,
+                                 val brokerId: Int,
+                                 val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+    pendingIsrUpdates synchronized {
+      pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+      lastIsrChangeMs.set(System.currentTimeMillis())
+      // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+      // in fast succession
+      if (scheduledRequest.isEmpty) {
+        scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+      }
+    }
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+    pendingIsrUpdates synchronized {
+      // when we get a new LeaderAndIsr, we clear out any pending requests
+      pendingIsrUpdates.remove(topicPartition)

Review comment:
       Removal from this set won't prevent `BrokerToControllerRequestThread` 
from retrying in-flight requests. I'm considering whether we should have a way 
to cancel requests that we are still awaiting.

##########
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##########
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 60000L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+                                 val zkClient: KafkaZkClient,
+                                 val scheduler: Scheduler,
+                                 val brokerId: Int,
+                                 val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {

Review comment:
       The broker epoch is not a constant. It gets reinitialized whenever the 
broker has to create a new session.

##########
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##########
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util

Review comment:
       Missing license header in this file.

##########
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##########
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 60000L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+                                 val zkClient: KafkaZkClient,
+                                 val scheduler: Scheduler,
+                                 val brokerId: Int,
+                                 val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+    pendingIsrUpdates synchronized {
+      pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+      lastIsrChangeMs.set(System.currentTimeMillis())
+      // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+      // in fast succession
+      if (scheduledRequest.isEmpty) {
+        scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+      }
+    }
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+    pendingIsrUpdates synchronized {
+      // when we get a new LeaderAndIsr, we clear out any pending requests
+      pendingIsrUpdates.remove(topicPartition)
+    }
+  }
+
+  override def startup(): Unit = {
+    controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+    controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+    val now = System.currentTimeMillis()
+    pendingIsrUpdates synchronized {
+      if (pendingIsrUpdates.nonEmpty) {
+        // Max ISRs to send?
+        val message = new AlterIsrRequestData()
+          .setBrokerId(brokerId)
+          .setBrokerEpoch(brokerEpoch)
+          .setTopics(new util.ArrayList())
+
+        
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+          val topicPart = new AlterIsrRequestTopics()
+            .setName(topic)
+            .setPartitions(new util.ArrayList())
+          message.topics().add(topicPart)
+          items.foreach(item => {
+            topicPart.partitions().add(new AlterIsrRequestPartitions()
+              .setPartitionIndex(item.topicPartition.partition())
+              .setLeaderId(item.leaderAndIsr.leader)
+              .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+              .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+              .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+            )
+          })
+        })
+
+        def responseHandler(response: ClientResponse): Unit = {
+          
println(response.responseBody().toString(response.requestHeader().apiVersion()))
+          val body: AlterIsrResponse = 
response.responseBody().asInstanceOf[AlterIsrResponse]
+          val data: AlterIsrResponseData = body.data()
+          Errors.forCode(data.errorCode()) match {
+            case Errors.NONE => info(s"Controller handled AlterIsr request")
+            case e: Errors => warn(s"Controller returned an error when 
handling AlterIsr request: $e")
+          }
+          data.topics().forEach(topic => {
+            topic.partitions().forEach(topicPartition => {
+              Errors.forCode(topicPartition.errorCode()) match {
+                case Errors.NONE => info(s"Controller handled AlterIsr for 
$topicPartition")
+                case e: Errors => warn(s"Controlled had an error handling 
AlterIsr for $topicPartition: $e")
+              }
+            })
+          })
+        }
+
+        info("Sending AlterIsr to controller")

Review comment:
       nit: more useful as a debug if we add request details to the message

##########
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##########
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 60000L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+                                 val zkClient: KafkaZkClient,
+                                 val scheduler: Scheduler,
+                                 val brokerId: Int,
+                                 val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+    pendingIsrUpdates synchronized {
+      pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+      lastIsrChangeMs.set(System.currentTimeMillis())
+      // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+      // in fast succession
+      if (scheduledRequest.isEmpty) {
+        scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+      }
+    }
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+    pendingIsrUpdates synchronized {
+      // when we get a new LeaderAndIsr, we clear out any pending requests
+      pendingIsrUpdates.remove(topicPartition)
+    }
+  }
+
+  override def startup(): Unit = {
+    controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+    controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+    val now = System.currentTimeMillis()
+    pendingIsrUpdates synchronized {
+      if (pendingIsrUpdates.nonEmpty) {
+        // Max ISRs to send?
+        val message = new AlterIsrRequestData()
+          .setBrokerId(brokerId)
+          .setBrokerEpoch(brokerEpoch)
+          .setTopics(new util.ArrayList())
+
+        
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+          val topicPart = new AlterIsrRequestTopics()
+            .setName(topic)
+            .setPartitions(new util.ArrayList())
+          message.topics().add(topicPart)
+          items.foreach(item => {
+            topicPart.partitions().add(new AlterIsrRequestPartitions()
+              .setPartitionIndex(item.topicPartition.partition())
+              .setLeaderId(item.leaderAndIsr.leader)
+              .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+              .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+              .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+            )
+          })
+        })
+
+        def responseHandler(response: ClientResponse): Unit = {
+          
println(response.responseBody().toString(response.requestHeader().apiVersion()))
+          val body: AlterIsrResponse = 
response.responseBody().asInstanceOf[AlterIsrResponse]
+          val data: AlterIsrResponseData = body.data()
+          Errors.forCode(data.errorCode()) match {
+            case Errors.NONE => info(s"Controller handled AlterIsr request")

Review comment:
       Probably need to reduce the log level here and below.

##########
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##########
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 60000L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+                                 val zkClient: KafkaZkClient,
+                                 val scheduler: Scheduler,
+                                 val brokerId: Int,
+                                 val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()

Review comment:
       The term "pending" again is a little unclear. Perhaps "unsentIsrUpdates" 
would make the usage clearer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to