[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-25 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r494520245



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1764,6 +1769,145 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+alterIsrRequest.topics.forEach { topicReq =>
+  topicReq.partitions.forEach { partitionReq =>
+val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+  }
+}
+
+def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+  val resp = new AlterIsrResponseData()
+  results match {
+case Right(error) =>
+  resp.setErrorCode(error.code)
+case Left(partitionResults) =>
+  resp.setTopics(new util.ArrayList())
+  partitionResults
+.groupBy { case (tp, _) => tp.topic }   // Group by topic
+.foreach { case (topic, partitions) =>
+  // Add each topic part to the response
+  val topicResp = new AlterIsrResponseData.TopicData()
+.setName(topic)
+.setPartitions(new util.ArrayList())
+  resp.topics.add(topicResp)
+  partitions.foreach { case (tp, errorOrIsr) =>
+// Add each partition part to the response (new ISR or error)
+errorOrIsr match {
+  case Left(error) => topicResp.partitions.add(
+new AlterIsrResponseData.PartitionData()
+  .setPartitionIndex(tp.partition)
+  .setErrorCode(error.code))
+  case Right(leaderAndIsr) => topicResp.partitions.add(
+new AlterIsrResponseData.PartitionData()
+  .setPartitionIndex(tp.partition)
+  .setLeaderId(leaderAndIsr.leader)
+  .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+  .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+}
+}
+  }
+  }
+  callback.apply(resp)
+}
+
+eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+  callback: AlterIsrCallback): Unit = {
+
+// Handle a few short-circuits
+if (!isActive) {
+  callback.apply(Right(Errors.NOT_CONTROLLER))
+  return
+}
+
+val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+if (!brokerEpochOpt.contains(brokerEpoch)) {
+  info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+val response = try {
+  val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+
+  // Determine which partitions we will accept the new ISR for
+  val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
isrsToAlter.flatMap {
+case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+  val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+case Some(leaderIsrAndControllerEpoch) =>
+  val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
+  if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {

Review comment:
   I was trying to think some kind of race with a zombie leader trying to 
update the ISR, however this would get fenced by the leader epoch. This should 
be pretty easy to add

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1246,6 +1351,51 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = {
+val isrToSendOpt: Option[Set[Int]] = proposedIsrState match {
+  case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + 
newInSyncReplicaId)
+  case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- 
outOfSyncReplicaIds)
+  case CommittedIsr(_) =>
+error(s"Asked to send AlterIsr but there are no pending updates")
+ 

[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-24 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r494523773



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1246,6 +1351,51 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = {
+val isrToSendOpt: Option[Set[Int]] = proposedIsrState match {
+  case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + 
newInSyncReplicaId)
+  case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- 
outOfSyncReplicaIds)
+  case CommittedIsr(_) =>
+error(s"Asked to send AlterIsr but there are no pending updates")
+None
+}
+isrToSendOpt.exists { isrToSend =>
+  val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.toList, zkVersion)
+  val callbackPartial = handleAlterIsrResponse(isrToSend, _ : 
Either[Errors, LeaderAndIsr])
+  alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr, 
callbackPartial))
+}
+  }
+
+  private def handleAlterIsrResponse(proposedIsr: Set[Int], result: 
Either[Errors, LeaderAndIsr]): Unit = {
+inWriteLock(leaderIsrUpdateLock) {
+  result match {
+case Left(error: Errors) => error match {
+  case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since it doesn't know about this topic or 
partition. Giving up.")
+  case Errors.FENCED_LEADER_EPOCH =>
+debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.")
+  case Errors.INVALID_UPDATE_VERSION =>
+debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to invalid zk version. Retrying.")
+sendAlterIsrRequest(isrState)
+  case _ =>
+warn(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to $error. Retrying.")
+sendAlterIsrRequest(isrState)

Review comment:
   True, we could see a new ISR from controller initiated changes via 
LeaderAndIsr while our request is in-flight. We have a check for this on 
successful responses, but we should also check here. Since our request failed, 
we don't have a leaderEpoch to check against so I think the best we can do is 
see if `isrState` is still pending before re-sending the request





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-17 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490594335



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -618,9 +682,9 @@ class Partition(val topicPartition: TopicPartition,
 // since the replica's logStartOffset may have incremented
 val leaderLWIncremented = newLeaderLW > oldLeaderLW
 
-// check if we need to expand ISR to include this replica
-// if it is not in the ISR yet
-if (!inSyncReplicaIds.contains(followerId))
+// Check if this in-sync replica needs to be added to the ISR. We look 
at the "maximal" ISR here so we don't
+// send an additional Alter ISR request for the same replica

Review comment:
   Yea checking the maximal set isn't needed anymore since adding the 
sealed trait. I'll just update this to simply call `maybeExpandIsr` which will 
do the check you propose here





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-17 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490562630



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -920,7 +986,7 @@ class Partition(val topicPartition: TopicPartition,
  * is violated, that replica is considered to be out of sync
  *
  **/
-val candidateReplicaIds = inSyncReplicaIds - localBrokerId
+val candidateReplicaIds = isrState.maximalIsr - localBrokerId

Review comment:
   Also, yes it's confusing to refer to `maximalIsr` here even though it 
should always equal the committed ISR at this point (assuming we check for 
inflight first). 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-17 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490561028



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -920,7 +986,7 @@ class Partition(val topicPartition: TopicPartition,
  * is violated, that replica is considered to be out of sync
  *
  **/
-val candidateReplicaIds = inSyncReplicaIds - localBrokerId
+val candidateReplicaIds = isrState.maximalIsr - localBrokerId

Review comment:
   Makes sense, that will also satisfy your other comment about not 
checking for inflight requests within the write lock





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-17 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490542749



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -200,9 +241,11 @@ class Partition(val topicPartition: TopicPartition,
   // defined when this broker is leader for partition
   @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
   @volatile var leaderReplicaIdOpt: Option[Int] = None
-  @volatile var inSyncReplicaIds = Set.empty[Int]
+  @volatile var isrState: IsrState = CommittedIsr(Set.empty)

Review comment:
   Yea i was thinking we should move the ISR to a separate public accessor. 
I'll change this





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-16 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r489756083



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1317,7 +1326,9 @@ class ReplicaManager(val config: KafkaConfig,
 partitionOpt.foreach { partition =>
   val currentLeaderEpoch = partition.getLeaderEpoch
   val requestLeaderEpoch = partitionState.leaderEpoch
-  if (requestLeaderEpoch > currentLeaderEpoch) {
+  val currentZkVersion = partition.getZkVersion
+  val requestZkVersion = partitionState.zkVersion
+  if (requestLeaderEpoch > currentLeaderEpoch || requestZkVersion 
> currentZkVersion) {

Review comment:
   No, we don't need this anymore. This was added so a LeaderAndIsr could 
update the Partition state without a leader epoch bump, but we don't have that 
flow anymore so we can revert this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-16 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r489753946



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3045,6 +3045,24 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
+val alterIsrRequest = request.body[AlterIsrRequest]
+
+if (!controller.isActive) {

Review comment:
   This is actually a really good point. I filed a JIRA to fix this in 
other places in KafkaApis https://issues.apache.org/jira/browse/KAFKA-10491





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-16 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r489747101



##
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrManager {
+  def start(): Unit
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean
+
+  def clearPending(topicPartition: TopicPartition): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Either[Errors, LeaderAndIsr] => Unit)
+
+class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+  val zkClient: KafkaZkClient,
+  val scheduler: Scheduler,
+  val time: Time,
+  val brokerId: Int,
+  val brokerEpochSupplier: () => Long) extends 
AlterIsrManager with Logging with KafkaMetricsGroup {
+
+  // Used to allow only one pending ISR update per partition
+  private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new 
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
+
+  // Used to allow only one in-flight request at a time
+  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  override def start(): Unit = {
+scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, 
TimeUnit.MILLISECONDS)
+  }
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean = {
+unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == 
null
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+unsentIsrUpdates.remove(topicPartition)
+  }
+
+  private def propagateIsrChanges(): Unit = {
+if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, 
true)) {
+  // Copy current unsent ISRs but don't remove from the map
+  val inflightAlterIsrItems: mutable.Queue[AlterIsrItem] = new 
mutable.Queue[AlterIsrItem]()
+  unsentIsrUpdates.values().forEach(item => 
inflightAlterIsrItems.enqueue(item))
+
+  val now = time.milliseconds()
+  lastIsrPropagationMs.set(now)
+
+  buildAndSendRequest(inflightAlterIsrItems.toSeq)
+}
+  }
+
+  def buildAndSendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
+val message = new AlterIsrRequestData()
+  .setBrokerId(brokerId)
+  .setBrokerEpoch(brokerEpochSupplier.apply())
+  .setTopics(new util.ArrayList())
+
+inflightAlterIsrItems.groupBy(_.topicPartition.topic).foreach(entry => {
+  val topicPart = new AlterIsrRequestData.TopicData()
+.setName(entry._1)
+.setPartitions(new util.ArrayList())
+  message.topics().add(topicPart)
+  entry._2.foreach(item => {
+topicPart.partitions().add(new AlterIsrRequestData.PartitionData()
+  .setPartitionIndex(item.topicPartition.partition)
+  .setLeaderId(item.leaderAndIsr.leader)
+  .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+  .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+)

[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-16 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r489745539



##
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrManager {
+  def start(): Unit
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean
+
+  def clearPending(topicPartition: TopicPartition): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Either[Errors, LeaderAndIsr] => Unit)
+
+class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+  val zkClient: KafkaZkClient,
+  val scheduler: Scheduler,
+  val time: Time,
+  val brokerId: Int,
+  val brokerEpochSupplier: () => Long) extends 
AlterIsrManager with Logging with KafkaMetricsGroup {
+
+  // Used to allow only one pending ISR update per partition
+  private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new 
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
+
+  // Used to allow only one in-flight request at a time
+  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  override def start(): Unit = {
+scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, 
TimeUnit.MILLISECONDS)
+  }
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean = {
+unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == 
null
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+unsentIsrUpdates.remove(topicPartition)
+  }
+
+  private def propagateIsrChanges(): Unit = {
+if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, 
true)) {
+  // Copy current unsent ISRs but don't remove from the map
+  val inflightAlterIsrItems: mutable.Queue[AlterIsrItem] = new 
mutable.Queue[AlterIsrItem]()
+  unsentIsrUpdates.values().forEach(item => 
inflightAlterIsrItems.enqueue(item))
+
+  val now = time.milliseconds()
+  lastIsrPropagationMs.set(now)
+
+  buildAndSendRequest(inflightAlterIsrItems.toSeq)
+}
+  }
+
+  def buildAndSendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
+val message = new AlterIsrRequestData()
+  .setBrokerId(brokerId)
+  .setBrokerEpoch(brokerEpochSupplier.apply())
+  .setTopics(new util.ArrayList())
+
+inflightAlterIsrItems.groupBy(_.topicPartition.topic).foreach(entry => {
+  val topicPart = new AlterIsrRequestData.TopicData()
+.setName(entry._1)
+.setPartitions(new util.ArrayList())
+  message.topics().add(topicPart)
+  entry._2.foreach(item => {
+topicPart.partitions().add(new AlterIsrRequestData.PartitionData()
+  .setPartitionIndex(item.topicPartition.partition)
+  .setLeaderId(item.leaderAndIsr.leader)
+  .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+  .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+)

[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-16 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r489744561



##
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrManager {
+  def start(): Unit
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean
+
+  def clearPending(topicPartition: TopicPartition): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Either[Errors, LeaderAndIsr] => Unit)
+
+class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+  val zkClient: KafkaZkClient,
+  val scheduler: Scheduler,
+  val time: Time,
+  val brokerId: Int,
+  val brokerEpochSupplier: () => Long) extends 
AlterIsrManager with Logging with KafkaMetricsGroup {
+
+  // Used to allow only one pending ISR update per partition
+  private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new 
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
+
+  // Used to allow only one in-flight request at a time
+  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  override def start(): Unit = {
+scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, 
TimeUnit.MILLISECONDS)
+  }
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean = {
+unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == 
null
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+unsentIsrUpdates.remove(topicPartition)
+  }
+
+  private def propagateIsrChanges(): Unit = {
+if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, 
true)) {
+  // Copy current unsent ISRs but don't remove from the map
+  val inflightAlterIsrItems: mutable.Queue[AlterIsrItem] = new 
mutable.Queue[AlterIsrItem]()
+  unsentIsrUpdates.values().forEach(item => 
inflightAlterIsrItems.enqueue(item))
+
+  val now = time.milliseconds()
+  lastIsrPropagationMs.set(now)
+
+  buildAndSendRequest(inflightAlterIsrItems.toSeq)
+}
+  }
+
+  def buildAndSendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
+val message = new AlterIsrRequestData()
+  .setBrokerId(brokerId)
+  .setBrokerEpoch(brokerEpochSupplier.apply())
+  .setTopics(new util.ArrayList())
+
+inflightAlterIsrItems.groupBy(_.topicPartition.topic).foreach(entry => {
+  val topicPart = new AlterIsrRequestData.TopicData()
+.setName(entry._1)
+.setPartitions(new util.ArrayList())
+  message.topics().add(topicPart)
+  entry._2.foreach(item => {
+topicPart.partitions().add(new AlterIsrRequestData.PartitionData()
+  .setPartitionIndex(item.topicPartition.partition)
+  .setLeaderId(item.leaderAndIsr.leader)
+  .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+  .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+)

[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-16 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r489740479



##
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrManager {
+  def start(): Unit
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean
+
+  def clearPending(topicPartition: TopicPartition): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Either[Errors, LeaderAndIsr] => Unit)
+
+class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+  val zkClient: KafkaZkClient,
+  val scheduler: Scheduler,
+  val time: Time,
+  val brokerId: Int,
+  val brokerEpochSupplier: () => Long) extends 
AlterIsrManager with Logging with KafkaMetricsGroup {
+
+  // Used to allow only one pending ISR update per partition
+  private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new 
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
+
+  // Used to allow only one in-flight request at a time
+  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  override def start(): Unit = {
+scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, 
TimeUnit.MILLISECONDS)

Review comment:
   Currently we impose a 2.5s delay for the old ZK based ISR propagation 
method. We could probably increase this 50ms up to a few hundred without any 
ill-effects. We still benefit from fact that we assume the maximal ISR 
immediately. How about 200ms?
   
   Longer term we can look into having a single thread invocation that sits in 
a while loop trying to consume from a LinkedBlockingQueue or maybe even a 
SynchronousQueue. But agreed we should leave this for later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-16 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r489730265



##
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrManager {
+  def start(): Unit
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean
+
+  def clearPending(topicPartition: TopicPartition): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Either[Errors, LeaderAndIsr] => Unit)
+
+class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+  val zkClient: KafkaZkClient,
+  val scheduler: Scheduler,
+  val time: Time,
+  val brokerId: Int,
+  val brokerEpochSupplier: () => Long) extends 
AlterIsrManager with Logging with KafkaMetricsGroup {
+
+  // Used to allow only one pending ISR update per partition
+  private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new 
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
+
+  // Used to allow only one in-flight request at a time
+  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  override def start(): Unit = {
+scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, 
TimeUnit.MILLISECONDS)
+  }
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Boolean = {
+unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == 
null
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+unsentIsrUpdates.remove(topicPartition)
+  }
+
+  private def propagateIsrChanges(): Unit = {
+if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, 
true)) {
+  // Copy current unsent ISRs but don't remove from the map
+  val inflightAlterIsrItems: mutable.Queue[AlterIsrItem] = new 
mutable.Queue[AlterIsrItem]()
+  unsentIsrUpdates.values().forEach(item => 
inflightAlterIsrItems.enqueue(item))
+
+  val now = time.milliseconds()
+  lastIsrPropagationMs.set(now)
+
+  buildAndSendRequest(inflightAlterIsrItems.toSeq)
+}
+  }
+
+  def buildAndSendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
+val message = new AlterIsrRequestData()
+  .setBrokerId(brokerId)
+  .setBrokerEpoch(brokerEpochSupplier.apply())
+  .setTopics(new util.ArrayList())
+
+inflightAlterIsrItems.groupBy(_.topicPartition.topic).foreach(entry => {
+  val topicPart = new AlterIsrRequestData.TopicData()
+.setName(entry._1)
+.setPartitions(new util.ArrayList())
+  message.topics().add(topicPart)
+  entry._2.foreach(item => {
+topicPart.partitions().add(new AlterIsrRequestData.PartitionData()
+  .setPartitionIndex(item.topicPartition.partition)
+  .setLeaderId(item.leaderAndIsr.leader)
+  .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+  .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+)

[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-16 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r489721316



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -617,9 +665,9 @@ class Partition(val topicPartition: TopicPartition,
 // since the replica's logStartOffset may have incremented
 val leaderLWIncremented = newLeaderLW > oldLeaderLW
 
-// check if we need to expand ISR to include this replica
-// if it is not in the ISR yet
-if (!inSyncReplicaIds.contains(followerId))
+// Check if this in-sync replica needs to be added to the ISR. We look 
at the "maximal" ISR here so we don't

Review comment:
   "maximal" works for me 👍 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-16 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r489720971



##
File path: config/log4j.properties
##
@@ -76,8 +76,8 @@ log4j.additivity.kafka.request.logger=false
 log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
 log4j.additivity.kafka.network.RequestChannel$=false
 
-log4j.logger.kafka.controller=TRACE, controllerAppender
-log4j.additivity.kafka.controller=false
+log4j.logger.kafka.controller=DEBUG, controllerAppender

Review comment:
   Yup, my mistake, shouldn't have been committed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-16 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r489703810



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1210,6 +1317,37 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def sendAlterIsrRequest(newIsr: Set[Int]): Boolean = {
+val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
+alterIsrManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, 
newLeaderAndIsr, result => {
+  inWriteLock(leaderIsrUpdateLock) {
+result match {
+  case Left(error: Errors) => error match {
+case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+  debug(s"Controller failed to update ISR for $topicPartition 
since it doesn't know about this partition. Giving up.")
+case Errors.FENCED_LEADER_EPOCH =>
+  debug(s"Controller failed to update ISR for $topicPartition 
since we sent an old leader epoch. Giving up.")
+case _ =>
+  pendingInSyncReplicaIds = None

Review comment:
   I think this has been a long-standing bad assumption on my part in this 
PR. I've been (mis)treating `pendingInSyncReplicaIds` as a mechanism for 
initiating a retry along with its other semantics. You're right though, 
explicitly re-sending the ISR is definitely better.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-10 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r486535868



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  private def expandIsr(newIsr: Set[Int]): Unit = {
-val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
+  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
+if (useAlterIsr) {
+  expandIsrWithAlterIsr(newInSyncReplica)
+} else {
+  expandIsrWithZk(newInSyncReplica)
+}
+  }
+
+  private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = {
+// This is called from maybeExpandIsr which holds the ISR write lock
+if (pendingInSyncReplicaIds.isEmpty) {
+  // When expanding the ISR, we can safely assume the new replica will 
make it into the ISR since this puts us in
+  // a more constrained state for advancing the HW.
+  val newIsr = inSyncReplicaIds + newInSyncReplica
+  pendingInSyncReplicaIds = Some(newIsr)
+  debug(s"Adding new in-sync replica $newInSyncReplica. Pending ISR 
updated to [${newIsr.mkString(",")}] for $topicPartition")
+  alterIsr(newIsr)
+} else {
+  debug(s"ISR update in-flight, not adding new in-sync replica 
$newInSyncReplica for $topicPartition")
+}
+  }
+
+  private def expandIsrWithZk(newInSyncReplica: Int): Unit = {
+val newInSyncReplicaIds = inSyncReplicaIds + newInSyncReplica
+info(s"Expanding ISR from ${inSyncReplicaIds.mkString(",")} to 
${newInSyncReplicaIds.mkString(",")}")
+val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newInSyncReplicaIds.toList, zkVersion)
 val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
-maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+maybeUpdateIsrAndVersionWithZk(newInSyncReplicaIds, zkVersionOpt)
   }
 
-  private[cluster] def shrinkIsr(newIsr: Set[Int]): Unit = {
+  private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
+if (useAlterIsr) {
+  shrinkIsrWithAlterIsr(outOfSyncReplicas)
+} else {
+  shrinkIsrWithZk(inSyncReplicaIds -- outOfSyncReplicas)
+}
+  }
+
+  private def shrinkIsrWithAlterIsr(outOfSyncReplicas: Set[Int]): Unit = {
+// This is called from maybeShrinkIsr which holds the ISR write lock
+if (pendingInSyncReplicaIds.isEmpty) {
+  // When shrinking the ISR, we cannot assume that the update will succeed 
as this could erroneously advance the HW
+  // We update pendingInSyncReplicaIds here simply to prevent any further 
ISR updates from occurring until we get
+  // the next LeaderAndIsr
+  pendingInSyncReplicaIds = Some(inSyncReplicaIds)

Review comment:
   How about `uncommittedInSyncReplicaIds`?

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -278,10 +284,10 @@ class KafkaController(val config: KafkaConfig,
   private def onControllerResignation(): Unit = {
 debug("Resigning")
 // de-register listeners
-
zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
 zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
 zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
 
zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
+zkClient.unregisterStateChangeHandler(isrChangeNotificationHandler.path)

Review comment:
   nope, will revert





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-10 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r486535166



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  private def expandIsr(newIsr: Set[Int]): Unit = {
-val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
+  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
+if (useAlterIsr) {
+  expandIsrWithAlterIsr(newInSyncReplica)
+} else {
+  expandIsrWithZk(newInSyncReplica)
+}
+  }
+
+  private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = {
+// This is called from maybeExpandIsr which holds the ISR write lock
+if (pendingInSyncReplicaIds.isEmpty) {

Review comment:
   Yea, good idea. I'll leave the check here since we actually acquire and 
release the lock when checking if we should expand/shrink. It's possible that 
pendingInSyncReplicaIds is cleared by a LeaderAndIsr before we acquire the 
write lock to do the update





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-10 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r486535166



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  private def expandIsr(newIsr: Set[Int]): Unit = {
-val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
+  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
+if (useAlterIsr) {
+  expandIsrWithAlterIsr(newInSyncReplica)
+} else {
+  expandIsrWithZk(newInSyncReplica)
+}
+  }
+
+  private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = {
+// This is called from maybeExpandIsr which holds the ISR write lock
+if (pendingInSyncReplicaIds.isEmpty) {

Review comment:
   Yea, good idea. I'll leave the check here since we actually acquire and 
release the lock when checking if we should expand/shrink. It's possible that 
pendingInSyncReplicaIds is cleared by a LeaderAndIsr between then and when we 
acquire the write lock to do the update





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-10 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r486535166



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  private def expandIsr(newIsr: Set[Int]): Unit = {
-val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
+  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
+if (useAlterIsr) {
+  expandIsrWithAlterIsr(newInSyncReplica)
+} else {
+  expandIsrWithZk(newInSyncReplica)
+}
+  }
+
+  private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = {
+// This is called from maybeExpandIsr which holds the ISR write lock
+if (pendingInSyncReplicaIds.isEmpty) {

Review comment:
   Yea, good idea. I'll leave the check here since we actually acquire and 
release the lock when checking if we should expand/shrink. It's possible that 
pendingInSyncReplicaIds is cleared by an update between then and when we 
acquire the write lock to do the update





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-10 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r486521810



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -255,6 +265,15 @@ class Partition(val topicPartition: TopicPartition,
 
   def isAddingReplica(replicaId: Int): Boolean = 
assignmentState.isAddingReplica(replicaId)
 
+  def inSyncReplicaIds(includeUncommittedReplicas: Boolean = false): Set[Int] 
= {

Review comment:
   I think this sounds good, explict over implicit and all that. If we have 
two methods like this, should we then make `inSyncReplicaIds` a private member? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-10 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r486521810



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -255,6 +265,15 @@ class Partition(val topicPartition: TopicPartition,
 
   def isAddingReplica(replicaId: Int): Boolean = 
assignmentState.isAddingReplica(replicaId)
 
+  def inSyncReplicaIds(includeUncommittedReplicas: Boolean = false): Set[Int] 
= {

Review comment:
   I think this sounds good, explict over implicit and all that. If we 
create two methods should we then make `inSyncReplicaIds` a private member? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-26 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r478069145



##
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrManager {
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Errors => Unit)
+
+class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+  val zkClient: KafkaZkClient,
+  val scheduler: Scheduler,
+  val time: Time,
+  val brokerId: Int) extends AlterIsrManager with 
Logging with KafkaMetricsGroup {
+
+  private val unsentIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+unsentIsrUpdates synchronized {
+  unsentIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(time.milliseconds)
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+unsentIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  unsentIsrUpdates.remove(topicPartition)
+}
+  }
+
+  private def propagateIsrChanges(): Unit = {
+val now = time.milliseconds()
+unsentIsrUpdates synchronized {
+  if (unsentIsrUpdates.nonEmpty) {
+val brokerEpoch: Long = zkClient.getBrokerEpoch(brokerId) match {

Review comment:
   I wasn't too happy about this. Is there another way to get the current 
broker epoch? As I understand it, the broker epoch can change during the 
lifecycle of a broker. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-26 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r478067221



##
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrManager {
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Errors => Unit)
+
+class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+  val zkClient: KafkaZkClient,
+  val scheduler: Scheduler,
+  val time: Time,
+  val brokerId: Int) extends AlterIsrManager with 
Logging with KafkaMetricsGroup {
+
+  private val unsentIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+unsentIsrUpdates synchronized {
+  unsentIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(time.milliseconds)
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {

Review comment:
   I think that sounds pretty reasonable. Would we need any kind of timeout 
at this layer, or just rely on the underlying channel to provide timeouts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-24 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r475626573



##
File path: config/log4j.properties
##
@@ -61,8 +61,8 @@ 
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 log4j.logger.org.apache.zookeeper=INFO
 
 # Change the two lines below to adjust the general broker logging level 
(output to server.log and stdout)
-log4j.logger.kafka=INFO
-log4j.logger.org.apache.kafka=INFO
+log4j.logger.kafka=DEBUG

Review comment:
   Need to revert this stuff, didn't mean to commit





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-24 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r475632719



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1257,4 +1364,4 @@ class Partition(val topicPartition: TopicPartition,
 }
 partitionString.toString
   }
-}
+}

Review comment:
   newline





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-24 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r475625111



##
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##
@@ -325,7 +326,8 @@
 UNSTABLE_OFFSET_COMMIT(88, "There are unstable offsets that need to be 
cleared.", UnstableOffsetCommitException::new),
 THROTTLING_QUOTA_EXCEEDED(89, "The throttling quota has been exceeded.", 
ThrottlingQuotaExceededException::new),
 PRODUCER_FENCED(90, "There is a newer producer with the same 
transactionalId " +
-"which fences the current one.", ProducerFencedException::new);
+"which fences the current one.", ProducerFencedException::new),
+INVALID_UPDATE_VERSION(91, "The given ISR version was out-of-date.", 
InvalidUpdateVersionException::new);

Review comment:
   This error message should be less specific





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-24 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r475630695



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -806,8 +840,9 @@ class Partition(val topicPartition: TopicPartition,
   // avoid unnecessary collection generation
   var newHighWatermark = leaderLog.logEndOffsetMetadata
   remoteReplicasMap.values.foreach { replica =>
+// Note here we are using effectiveInSyncReplicaIds, see explanation 
above

Review comment:
   Fix comment to refer to correct variable





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-24 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r475629593



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -635,7 +666,7 @@ class Partition(val topicPartition: TopicPartition,
 
 // check if we need to expand ISR to include this replica
 // if it is not in the ISR yet

Review comment:
   Expand on this comment to discuss the maximal ISR





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-24 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r475627290



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -141,7 +142,8 @@ object Partition extends KafkaMetricsGroup {
   stateStore = zkIsrBackingStore,
   delayedOperations = delayedOperations,
   metadataCache = replicaManager.metadataCache,
-  logManager = replicaManager.logManager)
+  logManager = replicaManager.logManager,
+  alterIsrChannelManager = replicaManager.alterIsrManager)

Review comment:
   Rename to alterIsrManager





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-21 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r474959176



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+pendingIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  pendingIsrUpdates.remove(topicPartition)
+}
+  }
+
+  override def startup(): Unit = {
+controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+val now = System.currentTimeMillis()
+pendingIsrUpdates synchronized {
+  if (pendingIsrUpdates.nonEmpty) {
+// Max ISRs to send?
+val message = new AlterIsrRequestData()
+  .setBrokerId(brokerId)
+  .setBrokerEpoch(brokerEpoch)
+  .setTopics(new util.ArrayList())
+
+
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+  val topicPart = new AlterIsrRequestTopics()
+.setName(topic)
+.setPartitions(new util.ArrayList())
+  message.topics().add(topicPart)
+  items.foreach(item => {
+topicPart.partitions().add(new AlterIsrRequestPartitions()
+  .setPartitionIndex(item.topicPartition.partition())
+  .setLeaderId(item.leaderAndIsr.leader)
+  .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+  .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+)
+  })
+})
+
+def responseHandler(response: ClientResponse): Unit = {

Review comment:
   Continued in 
https://github.com/apache/kafka/pull/9100#discussion_r474934833





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-21 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r474934833



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+pendingIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  pendingIsrUpdates.remove(topicPartition)

Review comment:
   Update: after some discussion and looking over failed system tests, we 
ended up with the following error handling:
   
   * REPLICA_NOT_AVAILABLE and INVALID_REPLICA_ASSIGNMENT will clear the 
pending ISR to let the leader retry. This covers a case where a leader tries to 
add a replica to the ISR which is offline because it (the follower) just 
finished shutdown.
   * INVALID_UPDATE_VERSION will not clear the pending ISR since the broker has 
stale metadata.
   * FENCED_LEADER_EPOCH, NOT_LEADER_OR_FOLLOWER, UNKNOWN_TOPIC_OR_PARTITION 
will _not_ clear the pending state and therefor will not retry. We presume here 
that the controller is correct and the leader has old metadata. By not clearing 
the pending ISR, the leader will await LeaderAndIsr before attempting any 
further ISR changes
   * Other unspecified errors: clear the pending state and let the leader 
retry. Not sure what cases could cause other errors, but it is probably better 
to be in a retry loop than to be completely stuck





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-21 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r474934833



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+pendingIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  pendingIsrUpdates.remove(topicPartition)

Review comment:
   Update: after some discussion and looking over failed system tests, we 
ended up with the following error handling:
   
   * REPLICA_NOT_AVAILABLE and INVALID_REPLICA_ASSIGNMENT will clear the 
pending ISR to let the leader retry. This covers a case where a leader tries to 
add a replica to the ISR which is offline because it (the follower) just 
finished shutdown.
   * FENCED_LEADER_EPOCH, NOT_LEADER_OR_FOLLOWER, UNKNOWN_TOPIC_OR_PARTITION 
will _not_ clear the pending state and therefor will not retry. We presume here 
that the controller is correct and the leader has old metadata. By not clearing 
the pending ISR, the leader will await LeaderAndIsr before attempting any 
further ISR changes
   * Other unspecified errors: clear the pending state and let the leader 
retry. Not sure what cases could cause other errors, but it is probably better 
to be in a retry loop than to be completely stuck





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-21 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r474930329



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+pendingIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  pendingIsrUpdates.remove(topicPartition)
+}
+  }
+
+  override def startup(): Unit = {
+controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+val now = System.currentTimeMillis()
+pendingIsrUpdates synchronized {
+  if (pendingIsrUpdates.nonEmpty) {
+// Max ISRs to send?
+val message = new AlterIsrRequestData()
+  .setBrokerId(brokerId)
+  .setBrokerEpoch(brokerEpoch)
+  .setTopics(new util.ArrayList())
+
+
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+  val topicPart = new AlterIsrRequestTopics()

Review comment:
   It's a little tricky since LeaderAndIsr isn't visible to 
AlterIsrRequest. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-20 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r474106591



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+pendingIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  pendingIsrUpdates.remove(topicPartition)
+}
+  }
+
+  override def startup(): Unit = {
+controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+val now = System.currentTimeMillis()
+pendingIsrUpdates synchronized {
+  if (pendingIsrUpdates.nonEmpty) {
+// Max ISRs to send?
+val message = new AlterIsrRequestData()
+  .setBrokerId(brokerId)
+  .setBrokerEpoch(brokerEpoch)
+  .setTopics(new util.ArrayList())
+
+
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+  val topicPart = new AlterIsrRequestTopics()
+.setName(topic)
+.setPartitions(new util.ArrayList())
+  message.topics().add(topicPart)
+  items.foreach(item => {
+topicPart.partitions().add(new AlterIsrRequestPartitions()
+  .setPartitionIndex(item.topicPartition.partition())
+  .setLeaderId(item.leaderAndIsr.leader)
+  .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+  .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+)
+  })
+})
+
+def responseHandler(response: ClientResponse): Unit = {

Review comment:
   I found a race during the system tests when a follower is shutting down. 
The controller handles the shut down before it handles an AlterIsr. If the 
proposed ISR includes the now-offline replica, the controller refuses to update 
that ISR change and returns an error for that partition. It then sends out the 
current LeaderAndIsr.
   
   The problem is that the broker ignores this LeaderAndIsr since it has the 
same leader epoch. This is easy enou

[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-20 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r474106591



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+pendingIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  pendingIsrUpdates.remove(topicPartition)
+}
+  }
+
+  override def startup(): Unit = {
+controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+val now = System.currentTimeMillis()
+pendingIsrUpdates synchronized {
+  if (pendingIsrUpdates.nonEmpty) {
+// Max ISRs to send?
+val message = new AlterIsrRequestData()
+  .setBrokerId(brokerId)
+  .setBrokerEpoch(brokerEpoch)
+  .setTopics(new util.ArrayList())
+
+
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+  val topicPart = new AlterIsrRequestTopics()
+.setName(topic)
+.setPartitions(new util.ArrayList())
+  message.topics().add(topicPart)
+  items.foreach(item => {
+topicPart.partitions().add(new AlterIsrRequestPartitions()
+  .setPartitionIndex(item.topicPartition.partition())
+  .setLeaderId(item.leaderAndIsr.leader)
+  .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+  .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+)
+  })
+})
+
+def responseHandler(response: ClientResponse): Unit = {

Review comment:
   I found a race during the system tests when a broker is shutting down. 
The controller handles the shut down before it handles an AlterIsr. If the 
proposed ISR includes the now-offline replica, the controller refuses to update 
that ISR change and returns an error for that partition. It then sends out the 
current LeaderAndIsr.
   
   The problem is that the broker ignores this LeaderAndIsr since it has the 
same leader epoch. This is easy enough

[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-20 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r474070847



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+pendingIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  pendingIsrUpdates.remove(topicPartition)
+}
+  }
+
+  override def startup(): Unit = {
+controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+val now = System.currentTimeMillis()
+pendingIsrUpdates synchronized {
+  if (pendingIsrUpdates.nonEmpty) {
+// Max ISRs to send?
+val message = new AlterIsrRequestData()
+  .setBrokerId(brokerId)
+  .setBrokerEpoch(brokerEpoch)
+  .setTopics(new util.ArrayList())
+
+
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+  val topicPart = new AlterIsrRequestTopics()
+.setName(topic)
+.setPartitions(new util.ArrayList())
+  message.topics().add(topicPart)
+  items.foreach(item => {
+topicPart.partitions().add(new AlterIsrRequestPartitions()
+  .setPartitionIndex(item.topicPartition.partition())
+  .setLeaderId(item.leaderAndIsr.leader)
+  .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+  .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+)
+  })
+})
+
+def responseHandler(response: ClientResponse): Unit = {

Review comment:
   Turns out we do need to clear it. In the common case, the controller 
will still send out LeaderAndIsr following a failed ISR update. However, during 
testing I did find a case where a LeaderAndIsr failed to go out leading to a 
stuck leader.
   
   * ISR = {1,2}
   * Broker 3 comes up
   * Leader sends ISR = {1,2,3}
   * Broker 3 goes down
   * Controller handles broker shutdown
   * Controller handles AlterIsr, resulting LeaderAndIsr batch fails

[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-20 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r474070847



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+pendingIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  pendingIsrUpdates.remove(topicPartition)
+}
+  }
+
+  override def startup(): Unit = {
+controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+val now = System.currentTimeMillis()
+pendingIsrUpdates synchronized {
+  if (pendingIsrUpdates.nonEmpty) {
+// Max ISRs to send?
+val message = new AlterIsrRequestData()
+  .setBrokerId(brokerId)
+  .setBrokerEpoch(brokerEpoch)
+  .setTopics(new util.ArrayList())
+
+
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+  val topicPart = new AlterIsrRequestTopics()
+.setName(topic)
+.setPartitions(new util.ArrayList())
+  message.topics().add(topicPart)
+  items.foreach(item => {
+topicPart.partitions().add(new AlterIsrRequestPartitions()
+  .setPartitionIndex(item.topicPartition.partition())
+  .setLeaderId(item.leaderAndIsr.leader)
+  .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+  .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+)
+  })
+})
+
+def responseHandler(response: ClientResponse): Unit = {

Review comment:
   Turns out we do need to clear it. In the common case, the controller 
will still send out LeaderAndIsr following a failed ISR update. However, during 
testing I did find a case where a LeaderAndIsr failed to go out leading to a 
stuck leader.
   
   * ISR = {1,2}
   * Broker 3 comes up
   * Leader sends ISR = {1,2,3}
   * Broker 3 goes down
   * Controller handles broker shutdown
   * Controller handles AlterIsr, resulting LeaderAndIsr batch fails

[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-20 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r474070847



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+pendingIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  pendingIsrUpdates.remove(topicPartition)
+}
+  }
+
+  override def startup(): Unit = {
+controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+val now = System.currentTimeMillis()
+pendingIsrUpdates synchronized {
+  if (pendingIsrUpdates.nonEmpty) {
+// Max ISRs to send?
+val message = new AlterIsrRequestData()
+  .setBrokerId(brokerId)
+  .setBrokerEpoch(brokerEpoch)
+  .setTopics(new util.ArrayList())
+
+
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+  val topicPart = new AlterIsrRequestTopics()
+.setName(topic)
+.setPartitions(new util.ArrayList())
+  message.topics().add(topicPart)
+  items.foreach(item => {
+topicPart.partitions().add(new AlterIsrRequestPartitions()
+  .setPartitionIndex(item.topicPartition.partition())
+  .setLeaderId(item.leaderAndIsr.leader)
+  .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+  .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+)
+  })
+})
+
+def responseHandler(response: ClientResponse): Unit = {

Review comment:
   Turns out we do need to clear it. In the common case, the controller 
will still send out LeaderAndIsr following a failed ISR update. However, during 
testing I did find a case where a LeaderAndIsr failed to go out leading to a 
stuck leader.
   
   * ISR = {1,2}
   * Broker 3 comes up
   * Leader sends ISR = {1,2,3}
   * Broker 3 goes down
   * Controller handles broker shutdown
   * Controller handles AlterIsr, resulting LeaderAndIsr batch fails

[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-14 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r470678123



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -255,6 +255,10 @@ class Partition(val topicPartition: TopicPartition,
 
   def isAddingReplica(replicaId: Int): Boolean = 
assignmentState.isAddingReplica(replicaId)
 
+  // For advancing the HW we assume the largest ISR even if the controller 
hasn't made the change yet
+  // This set includes the latest ISR (as we learned from LeaderAndIsr) and 
any replicas from a pending ISR expansion
+  def effectiveInSyncReplicaIds: Set[Int] = inSyncReplicaIds | 
pendingInSyncReplicaIds

Review comment:
   Since we are now only allowing one in-flight AlterIsr, I changed the 
semantics of pendingInSyncReplicaIds to be the maximal "effective" ISR. This 
way we don't need to compute it each time.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-14 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r470675300



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+pendingIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  pendingIsrUpdates.remove(topicPartition)

Review comment:
   With the latest changes to prevent multiple in-flight requests, I don't 
think this should happen for a given partition. Even if it did, the retried 
in-flight request from BrokerToControllerRequestThread would fail on the 
controller with an old version. 
   
   I'm wondering if we even need this clearPending behavior. Since I changed 
the AlterIsr request to fire at most after 50ms, it's a narrow window between 
enqueueing an ISR update and receiving a LeaderAndIsr. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-14 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r470673412



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {

Review comment:
   Fixed this by adding getBrokerEpoch to KafkaZkClient





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-13 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r470063833



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -485,13 +490,11 @@ class Partition(val topicPartition: TopicPartition,
   def makeLeader(partitionState: LeaderAndIsrPartitionState,

Review comment:
   I don't think AlterIsr changes anything since we're now just sending the 
async ISR update where we were previously directly updating ZK. 
   
   Looking at the usages, `updateFollowerFetchState` is only called following a 
read (`Partition#readRecords`). These reads only happen on fetch requests and 
from the alter log dirs fetcher. I'm not sure about the alter log dirs flow, 
but as long as it sends the leader epoch, it should be safe.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-13 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r470053173



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -569,8 +575,8 @@ class Partition(val topicPartition: TopicPartition,
   val oldLeaderEpoch = leaderEpoch
   // record the epoch of the controller that made the leadership decision. 
This is useful while updating the isr
   // to maintain the decision maker controller's epoch in the zookeeper 
path
-  controllerEpoch = partitionState.controllerEpoch
 
+  info(s"Follower ignoring ISR for $topicPartition")

Review comment:
   yea, lots of these will be lowered, was just doing this during 
development





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-12 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r469467881



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1210,28 +1218,20 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  private def expandIsr(newIsr: Set[Int]): Unit = {
-val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
-maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
-  }
+  private def expandIsr(newInSyncReplica: Int): Unit = {
+pendingInSyncReplicaIds += newInSyncReplica
+info(s"Adding new in-sync replica $newInSyncReplica. Pending ISR updated 
to [${pendingInSyncReplicaIds.mkString(",")}]")
 
-  private[cluster] def shrinkIsr(newIsr: Set[Int]): Unit = {
-val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
-maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
pendingInSyncReplicaIds.toList, zkVersion)
+alterIsrChannelManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, 
newLeaderAndIsr))
   }
 
-  private[cluster] def maybeUpdateIsrAndVersion(isr: Set[Int], zkVersionOpt: 
Option[Int]): Unit = {
-zkVersionOpt match {
-  case Some(newVersion) =>
-inSyncReplicaIds = isr
-zkVersion = newVersion
-info("ISR updated to [%s] and zkVersion updated to 
[%d]".format(isr.mkString(","), zkVersion))
+  private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
+pendingInSyncReplicaIds --= outOfSyncReplicas

Review comment:
   I'm currently looking at the effective ISR to find new out of sync 
replicas. This can include new ISR members which haven't made it into the 
"true" ISR via LeaderAndIsr yet (like broker=3 in your example). Maybe we 
should only consider removing ISR members iff they are in the true ISR. IOW 
changing from
   
   ```scala
   val candidateReplicaIds = effectiveInSyncReplicaIds - localBrokerId
   ```
   to
   ```scala
   val candidateReplicaIds = inSyncReplicaIds - localBrokerId
   ```
   
   Also, I wonder if the batching that's happening in AlterIsrChannelManager 
violates the model. It sends the request asynchronously with a small delay, so 
multiple ISR changes can be batched into one AlterIsr.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-12 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r469460462



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L

Review comment:
   We don't, these were copied over from the ReplicaManager's ISR 
propagation logic. I'll clean this up





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465750632



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+pendingIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  pendingIsrUpdates.remove(topicPartition)
+}
+  }
+
+  override def startup(): Unit = {
+controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+val now = System.currentTimeMillis()
+pendingIsrUpdates synchronized {
+  if (pendingIsrUpdates.nonEmpty) {
+// Max ISRs to send?
+val message = new AlterIsrRequestData()
+  .setBrokerId(brokerId)
+  .setBrokerEpoch(brokerEpoch)
+  .setTopics(new util.ArrayList())
+
+
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+  val topicPart = new AlterIsrRequestTopics()
+.setName(topic)
+.setPartitions(new util.ArrayList())
+  message.topics().add(topicPart)
+  items.foreach(item => {
+topicPart.partitions().add(new AlterIsrRequestPartitions()
+  .setPartitionIndex(item.topicPartition.partition())
+  .setLeaderId(item.leaderAndIsr.leader)
+  .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+  .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+)
+  })
+})
+
+def responseHandler(response: ClientResponse): Unit = {
+  
println(response.responseBody().toString(response.requestHeader().apiVersion()))
+  val body: AlterIsrResponse = 
response.responseBody().asInstanceOf[AlterIsrResponse]
+  val data: AlterIsrResponseData = body.data()
+  Errors.forCode(data.errorCode()) match {
+case Errors.NONE => info(s"Controller handled AlterIsr request")
+case e: Errors => warn(s"Controller returned an error when 
handling AlterIsr request: 

[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465750445



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+pendingIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  pendingIsrUpdates.remove(topicPartition)
+}
+  }
+
+  override def startup(): Unit = {
+controllerChannelManager.start()
+  }
+
+  override def shutdown(): Unit = {
+controllerChannelManager.shutdown()
+  }
+
+  private def propagateIsrChanges(): Unit = {
+val now = System.currentTimeMillis()
+pendingIsrUpdates synchronized {
+  if (pendingIsrUpdates.nonEmpty) {
+// Max ISRs to send?
+val message = new AlterIsrRequestData()
+  .setBrokerId(brokerId)
+  .setBrokerEpoch(brokerEpoch)
+  .setTopics(new util.ArrayList())
+
+
pendingIsrUpdates.values.groupBy(_.topicPartition.topic()).foreachEntry((topic, 
items) => {
+  val topicPart = new AlterIsrRequestTopics()
+.setName(topic)
+.setPartitions(new util.ArrayList())
+  message.topics().add(topicPart)
+  items.foreach(item => {
+topicPart.partitions().add(new AlterIsrRequestPartitions()
+  .setPartitionIndex(item.topicPartition.partition())
+  .setLeaderId(item.leaderAndIsr.leader)
+  .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+  .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+)
+  })
+})
+
+def responseHandler(response: ClientResponse): Unit = {
+  
println(response.responseBody().toString(response.requestHeader().apiVersion()))

Review comment:
   Remove this





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465749546



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)

Review comment:
   Should probably get rid of this and change the method to 
`enqueueIsrUpdate(TopicPartition, LeaderAndIsr)`

##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)

Review comment:
   Should probably get rid of this and change the method to 
```enqueueIsrUpdate(TopicPartition, LeaderAndIsr)```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465749546



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr)

Review comment:
   Should probably get rid of this and change the method to 
   
   ```enqueueIsrUpdate(TopicPartition, LeaderAndIsr)```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465748120



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1776,141 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+alterIsrRequest.topics().forEach(topicReq => 
topicReq.partitions().forEach(partitionReq => {
+  val tp = new TopicPartition(topicReq.name(), 
partitionReq.partitionIndex())
+  val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+  isrsToAlter.put(tp, new LeaderAndIsr(partitionReq.leaderId(), 
partitionReq.leaderEpoch(), newIsr, partitionReq.currentIsrVersion()))
+}))
+
+def responseCallback(results: Either[Map[TopicPartition, Errors], 
Errors]): Unit = {
+  val resp = new AlterIsrResponseData()
+  results match {
+case Right(error) =>
+  resp.setErrorCode(error.code())
+case Left(partitions: Map[TopicPartition, Errors]) =>
+  resp.setTopics(new util.ArrayList())
+  partitions.groupBy(_._1.topic()).foreachEntry((topic, partitionMap) 
=> {
+val topicResp = new AlterIsrResponseTopics()
+  .setName(topic)
+  .setPartitions(new util.ArrayList())
+resp.topics().add(topicResp)
+partitionMap.foreachEntry((partition, error) => {
+  topicResp.partitions().add(
+new AlterIsrResponsePartitions()
+  .setPartitionIndex(partition.partition())
+  .setErrorCode(error.code()))
+})
+  })
+  }
+  callback.apply(resp)
+}
+
+eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId(), 
alterIsrRequest.brokerEpoch(), isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+  callback: AlterIsrCallback): Unit = {
+if (!isActive) {
+  callback.apply(Right(Errors.NOT_CONTROLLER))
+  return
+}
+
+val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+if (!brokerEpochOpt.contains(brokerEpoch)) {
+  info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+val partitionErrors: mutable.Map[TopicPartition, Errors] = 
mutable.HashMap[TopicPartition, Errors]()
+
+val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = isrsToAlter.flatMap {
+  case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+  case Some(leaderIsrAndControllerEpoch) =>
+val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+if (newLeaderAndIsr.leader != currentLeaderAndIsr.leader) {
+  Errors.NOT_LEADER_OR_FOLLOWER
+} else if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {
+  Errors.FENCED_LEADER_EPOCH
+} else {
+  val currentAssignment = 
controllerContext.partitionReplicaAssignment(tp)
+  if (!newLeaderAndIsr.isr.forall(replicaId => 
currentAssignment.contains(replicaId))) {
+warn(s"Some of the proposed ISR are not in the assignment for 
partition $tp. Proposed ISR=$newLeaderAndIsr.isr assignment=$currentAssignment")
+Errors.INVALID_REQUEST
+  } else if (!newLeaderAndIsr.isr.forall(replicaId => 
controllerContext.isReplicaOnline(replicaId, tp))) {
+warn(s"Some of the proposed ISR are offline for partition $tp. 
Proposed ISR=$newLeaderAndIsr.isr")
+Errors.INVALID_REQUEST
+  } else {
+Errors.NONE
+  }
+}
+  case None => Errors.UNKNOWN_TOPIC_OR_PARTITION
+}
+if (partitionError == Errors.NONE) {
+  // Bump the leaderEpoch for partitions that we're going to write
+  Some(tp -> newLeaderAndIsr.newEpochAndZkVersion)
+} else {
+  partitionErrors.put(tp, partitionError)
+  None
+}
+}
+
+// Do the updates in ZK
+info(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) =  
zkClient.updateLeaderAndIsr(
+  adjustedIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
+
+val successfulUpdates: Map[TopicPartition, LeaderAndIsr] = 
finishedUpdates.flatMap {
+  case (partition

[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465748504



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,121 @@
+package kafka.server

Review comment:
   Yea, maybe just "AlterIsrManager"? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465746162



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1776,141 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+alterIsrRequest.topics().forEach(topicReq => 
topicReq.partitions().forEach(partitionReq => {

Review comment:
   It might be simpler just to use AlterIsrRequestData and 
AlterIsrResponseData throughout this code (rather than converting to 
`Map[TopicPartition, LeaderAndIsr]` and `Map[TopicPartition, Errors]`)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-05 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465743922



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  // TODO is it okay to pull message classes down into the controller?
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+//val brokerEpochOpt = 
controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId())
+/*if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker 
${alterIsrRequest.brokerId()}")
+  // TODO is INVALID_REQUEST a reasonable error here?
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.INVALID_REQUEST.code))
+  return
+}
+
+if (!brokerEpochOpt.contains(alterIsrRequest.brokerEpoch())) {
+  info(s"Ignoring AlterIsr due to stale broker epoch 
${alterIsrRequest.brokerEpoch()} for broker ${alterIsrRequest.brokerId()}")
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
+  return
+}*/
+
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+val resp = new AlterIsrResponseData()
+resp.setTopics(new util.ArrayList())
+
+alterIsrRequest.topics().forEach(topicReq => {
+  val topicResp = new AlterIsrResponseTopics()
+.setName(topicReq.name())
+.setPartitions(new util.ArrayList())
+  resp.topics().add(topicResp)
+
+  topicReq.partitions().forEach(partitionReq => {
+val partitionResp = new AlterIsrResponsePartitions()
+  .setPartitionIndex(partitionReq.partitionIndex())
+topicResp.partitions().add(partitionResp)
+
+// For each partition who's ISR we are altering, let's do some upfront 
validation for the broker response

Review comment:
   @cmccabe good to know about `controllerContext` 👍 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-04 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465029352



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  // TODO is it okay to pull message classes down into the controller?
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+//val brokerEpochOpt = 
controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId())
+/*if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker 
${alterIsrRequest.brokerId()}")
+  // TODO is INVALID_REQUEST a reasonable error here?
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.INVALID_REQUEST.code))
+  return
+}
+
+if (!brokerEpochOpt.contains(alterIsrRequest.brokerEpoch())) {
+  info(s"Ignoring AlterIsr due to stale broker epoch 
${alterIsrRequest.brokerEpoch()} for broker ${alterIsrRequest.brokerId()}")
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
+  return
+}*/
+
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+val resp = new AlterIsrResponseData()
+resp.setTopics(new util.ArrayList())
+
+alterIsrRequest.topics().forEach(topicReq => {
+  val topicResp = new AlterIsrResponseTopics()
+.setName(topicReq.name())
+.setPartitions(new util.ArrayList())
+  resp.topics().add(topicResp)
+
+  topicReq.partitions().forEach(partitionReq => {
+val partitionResp = new AlterIsrResponsePartitions()
+  .setPartitionIndex(partitionReq.partitionIndex())
+topicResp.partitions().add(partitionResp)
+
+// For each partition who's ISR we are altering, let's do some upfront 
validation for the broker response

Review comment:
   Ah, I see what you mean. Initially, I was concerned about blocking for 
too long while waiting for a response, but it looks like there is precedent for 
this pattern for some requests (reassignment, leader election, controlled 
shutdown). I'll move this validation and the callback down into the event 
processor method





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-03 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r464717171



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  // TODO is it okay to pull message classes down into the controller?
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+//val brokerEpochOpt = 
controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId())
+/*if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker 
${alterIsrRequest.brokerId()}")
+  // TODO is INVALID_REQUEST a reasonable error here?
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.INVALID_REQUEST.code))
+  return
+}
+
+if (!brokerEpochOpt.contains(alterIsrRequest.brokerEpoch())) {
+  info(s"Ignoring AlterIsr due to stale broker epoch 
${alterIsrRequest.brokerEpoch()} for broker ${alterIsrRequest.brokerId()}")
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
+  return
+}*/
+
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+val resp = new AlterIsrResponseData()
+resp.setTopics(new util.ArrayList())
+
+alterIsrRequest.topics().forEach(topicReq => {
+  val topicResp = new AlterIsrResponseTopics()
+.setName(topicReq.name())
+.setPartitions(new util.ArrayList())
+  resp.topics().add(topicResp)
+
+  topicReq.partitions().forEach(partitionReq => {
+val partitionResp = new AlterIsrResponsePartitions()
+  .setPartitionIndex(partitionReq.partitionIndex())
+topicResp.partitions().add(partitionResp)
+
+// For each partition who's ISR we are altering, let's do some upfront 
validation for the broker response

Review comment:
   The main rationale for validating in the request handler is so we can 
return meaningful partition-level errors to the broker (fenced leader, not 
leader or follower, etc). Although, I'm not sure the broker could do anything 
useful with these errors since it probably has stale metadata in these cases.
   
   The KIP calls out four partition-level errors. Do we actually need them?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-03 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r464716040



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,121 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Errors => Unit)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Queue[AlterIsrItem] = new 
mutable.Queue[AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates += alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+}
+  }
+
+  override def startup(): Unit = {
+scheduler.schedule("alter-isr-send", maybePropagateIsrChanges _, period = 
2500L, unit = TimeUnit.MILLISECONDS)

Review comment:
   That makes sense. I'll change that (this was pulled in from the previous 
ISR notification code in ReplicaManager)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org