Repository: kafka
Updated Branches:
  refs/heads/0.10.1 6a255f81c -> 9c4f1d54f


KAFKA-4313; ISRs may thrash when replication quota is enabled

Author: Jun Rao <[email protected]>

Reviewers: Ben Stopford <[email protected]>, Ismael Juma <[email protected]>

Closes #2043 from junrao/kafka-4313

(cherry picked from commit 24067e40764d91e1a6b2d80be45407841bbb72a2)
Signed-off-by: Ismael Juma <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9c4f1d54
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9c4f1d54
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9c4f1d54

Branch: refs/heads/0.10.1
Commit: 9c4f1d54f52afdb3eb1e4127b671f64c8ea4d939
Parents: 6a255f8
Author: Jun Rao <[email protected]>
Authored: Thu Oct 20 10:38:11 2016 +0100
Committer: Ismael Juma <[email protected]>
Committed: Thu Oct 20 11:10:49 2016 +0100

----------------------------------------------------------------------
 .../kafka/server/AbstractFetcherThread.scala    |  8 +++
 .../main/scala/kafka/server/DelayedFetch.scala  | 17 +++--
 .../kafka/server/ReplicaFetcherThread.scala     | 12 +++-
 .../scala/kafka/server/ReplicaManager.scala     | 30 ++++++--
 .../kafka/server/ReplicaManagerQuotasTest.scala | 74 +++++++++++++++++---
 .../unit/kafka/server/SimpleFetchTest.scala     | 18 ++++-
 6 files changed, 133 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9c4f1d54/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 2f2cb4b..30f5125 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -304,6 +304,14 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
     stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, 
partitionId))
   }
 
+  def isReplicaInSync(topic: String, partitionId: Int): Boolean = {
+    val fetcherLagMetrics = stats.get(new 
ClientIdTopicPartition(metricId.clientId, topic, partitionId))
+    if (fetcherLagMetrics != null)
+      fetcherLagMetrics.lag <= 0
+    else
+      false
+  }
+
   def unregister(topic: String, partitionId: Int) {
     val lagMetrics = stats.remove(new 
ClientIdTopicPartition(metricId.clientId, topic, partitionId))
     if (lagMetrics != null) lagMetrics.unregister()

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c4f1d54/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala 
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 4b17e81..2feeae8 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -42,6 +42,7 @@ case class FetchMetadata(fetchMinBytes: Int,
                          fetchOnlyLeader: Boolean,
                          fetchOnlyCommitted: Boolean,
                          isFromFollower: Boolean,
+                         replicaId: Int,
                          fetchPartitionStatus: Seq[(TopicAndPartition, 
FetchPartitionStatus)]) {
 
   override def toString = "[minBytes: " + fetchMinBytes + ", " +
@@ -97,7 +98,8 @@ class DelayedFetch(delayMs: Long,
                 // Case C, this can happen when the fetch operation is falling 
behind the current segment
                 // or the partition has just rolled a new segment
                 debug("Satisfying fetch %s immediately since it is fetching 
older segments.".format(fetchMetadata))
-                if (!(quota.isThrottled(topicAndPartition) && 
quota.isQuotaExceeded()))
+                // We will not force complete the fetch request if a replica 
should be throttled.
+                if (!replicaManager.shouldLeaderThrottle(quota, 
topicAndPartition, fetchMetadata.replicaId))
                   return forceComplete()
               } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
                 // we take the partition fetch size as upper bound when 
accumulating the bytes (skip if a throttled partition)
@@ -139,12 +141,13 @@ class DelayedFetch(delayMs: Long,
    */
   override def onComplete() {
     val logReadResults = replicaManager.readFromLocalLog(
-      fetchMetadata.fetchOnlyLeader,
-      fetchMetadata.fetchOnlyCommitted,
-      fetchMetadata.fetchMaxBytes,
-      fetchMetadata.hardMaxBytesLimit,
-      fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> 
status.fetchInfo },
-      quota
+      replicaId = fetchMetadata.replicaId,
+      fetchOnlyFromLeader = fetchMetadata.fetchOnlyLeader,
+      readOnlyCommitted = fetchMetadata.fetchOnlyCommitted,
+      fetchMaxBytes = fetchMetadata.fetchMaxBytes,
+      hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
+      readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, 
status) => tp -> status.fetchInfo },
+      quota = quota
     )
 
     val fetchPartitionData = logReadResults.map { case (tp, result) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c4f1d54/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 7930716..6f4c589 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -286,10 +286,10 @@ class ReplicaFetcherThread(name: String,
   protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, 
PartitionFetchState)]): FetchRequest = {
     val requestMap = new util.LinkedHashMap[TopicPartition, 
JFetchRequest.PartitionData]
 
-    val quotaExceeded = quota.isQuotaExceeded
     partitionMap.foreach { case (topicPartition, partitionFetchState) =>
       val topicAndPartition = new TopicAndPartition(topicPartition.topic, 
topicPartition.partition)
-      if (partitionFetchState.isActive && 
!(quota.isThrottled(topicAndPartition) && quotaExceeded))
+      // We will not include a replica in the fetch request if it should be 
throttled.
+      if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, 
topicAndPartition))
         requestMap.put(topicPartition, new 
JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))
     }
 
@@ -300,6 +300,14 @@ class ReplicaFetcherThread(name: String,
     new FetchRequest(request)
   }
 
+  /**
+   *  To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
+   *  the quota is exceeded and the replica is not in sync.
+   */
+  private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: 
TopicAndPartition): Boolean = {
+    val isReplicaInSync = 
fetcherLagStats.isReplicaInSync(topicPartition.topic, topicPartition.partition)
+    quota.isThrottled(topicPartition) && quota.isQuotaExceeded && 
!isReplicaInSync
+  }
 }
 
 object ReplicaFetcherThread {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c4f1d54/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2c843e8..32bc660 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -466,8 +466,14 @@ class ReplicaManager(val config: KafkaConfig,
     val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
 
     // read from local logs
-    val logReadResults = readFromLocalLog(fetchOnlyFromLeader, 
fetchOnlyCommitted, fetchMaxBytes, hardMaxBytesLimit,
-      fetchInfos, quota)
+    val logReadResults = readFromLocalLog(
+      replicaId = replicaId,
+      fetchOnlyFromLeader = fetchOnlyFromLeader,
+      readOnlyCommitted = fetchOnlyCommitted,
+      fetchMaxBytes = fetchMaxBytes,
+      hardMaxBytesLimit = hardMaxBytesLimit,
+      readPartitionInfo = fetchInfos,
+      quota = quota)
 
     // if the fetch comes from the follower,
     // update its corresponding log end offset
@@ -498,7 +504,7 @@ class ReplicaManager(val config: KafkaConfig,
         (topicAndPartition, 
FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
       }
       val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, 
hardMaxBytesLimit, fetchOnlyFromLeader,
-        fetchOnlyCommitted, isFromFollower, fetchPartitionStatus)
+        fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus)
       val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, 
responseCallback)
 
       // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
@@ -514,7 +520,8 @@ class ReplicaManager(val config: KafkaConfig,
   /**
    * Read from multiple topic partitions at the given offset up to maxSize 
bytes
    */
-  def readFromLocalLog(fetchOnlyFromLeader: Boolean,
+  def readFromLocalLog(replicaId: Int,
+                       fetchOnlyFromLeader: Boolean,
                        readOnlyCommitted: Boolean,
                        fetchMaxBytes: Int,
                        hardMaxBytesLimit: Boolean,
@@ -559,8 +566,8 @@ class ReplicaManager(val config: KafkaConfig,
             // Try the read first, this tells us whether we need all of 
adjustedFetchSize for this partition
             val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, 
minOneMessage)
 
-            // If the partition is marked as throttled, and we are over-quota 
then exclude it
-            if (quota.isThrottled(tp) && quota.isQuotaExceeded)
+            // If the partition is being throttled, simply return an empty set.
+            if (shouldLeaderThrottle(quota, tp, replicaId))
               FetchDataInfo(fetch.fetchOffsetMetadata, MessageSet.Empty)
             // For FetchRequest version 3, we replace incomplete message sets 
with an empty one as consumers can make
             // progress in such cases and don't need to report a 
`RecordTooLargeException`
@@ -607,6 +614,17 @@ class ReplicaManager(val config: KafkaConfig,
     result
   }
 
+  /**
+   *  To avoid ISR thrashing, we only throttle a replica on the leader if it's 
in the throttled replica list,
+   *  the quota is exceeded and the replica is not in sync.
+   */
+  def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: 
TopicAndPartition, replicaId: Int): Boolean = {
+    val isReplicaInSync = getPartition(topicPartition.topic, 
topicPartition.partition).flatMap { partition =>
+      partition.getReplica(replicaId).map(partition.inSyncReplicas.contains)
+    }.getOrElse(false)
+    quota.isThrottled(topicPartition) && quota.isQuotaExceeded && 
!isReplicaInSync
+  }
+
   def getMessageFormatVersion(topicAndPartition: TopicAndPartition): 
Option[Byte] =
     getReplica(topicAndPartition.topic, topicAndPartition.partition).flatMap { 
replica =>
       replica.log.map(_.config.messageFormatVersion.messageFormatVersion)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c4f1d54/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index f8f8dda..17e0516 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -48,13 +48,21 @@ class ReplicaManagerQuotasTest {
   @Test
   def shouldExcludeSubsequentThrottledPartitions(): Unit = {
     setUpMocks(fetchInfo)
+    val followerReplicaId = configs.last.brokerId
 
     val quota = mockQuota(1000000)
     expect(quota.isQuotaExceeded()).andReturn(false).once()
     expect(quota.isQuotaExceeded()).andReturn(true).once()
     replay(quota)
 
-    val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, 
false, fetchInfo, quota)
+    val fetch = replicaManager.readFromLocalLog(
+      replicaId = followerReplicaId,
+      fetchOnlyFromLeader = true,
+      readOnlyCommitted = true,
+      fetchMaxBytes = Int.MaxValue,
+      hardMaxBytesLimit = false,
+      readPartitionInfo = fetchInfo,
+      quota = quota)
     assertEquals("Given two partitions, with only one throttled, we should get 
the first", 1,
       fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
 
@@ -65,13 +73,21 @@ class ReplicaManagerQuotasTest {
   @Test
   def shouldGetNoMessagesIfQuotasExceededOnSubsequentPartitions(): Unit = {
     setUpMocks(fetchInfo)
+    val followerReplicaId = configs.last.brokerId
 
     val quota = mockQuota(1000000)
     expect(quota.isQuotaExceeded()).andReturn(true).once()
     expect(quota.isQuotaExceeded()).andReturn(true).once()
     replay(quota)
 
-    val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, 
false, fetchInfo, quota)
+    val fetch = replicaManager.readFromLocalLog(
+      replicaId = followerReplicaId,
+      fetchOnlyFromLeader = true,
+      readOnlyCommitted = true,
+      fetchMaxBytes = Int.MaxValue,
+      hardMaxBytesLimit = false,
+      readPartitionInfo = fetchInfo,
+      quota = quota)
     assertEquals("Given two partitions, with both throttled, we should get no 
messages", 0,
       fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
     assertEquals("Given two partitions, with both throttled, we should get no 
messages", 0,
@@ -81,20 +97,53 @@ class ReplicaManagerQuotasTest {
   @Test
   def shouldGetBothMessagesIfQuotasAllow(): Unit = {
     setUpMocks(fetchInfo)
+    val followerReplicaId = configs.last.brokerId
 
     val quota = mockQuota(1000000)
     expect(quota.isQuotaExceeded()).andReturn(false).once()
     expect(quota.isQuotaExceeded()).andReturn(false).once()
     replay(quota)
 
-    val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, 
false, fetchInfo, quota)
+    val fetch = replicaManager.readFromLocalLog(
+      replicaId = followerReplicaId,
+      fetchOnlyFromLeader = true,
+      readOnlyCommitted = true,
+      fetchMaxBytes = Int.MaxValue,
+      hardMaxBytesLimit = false,
+      readPartitionInfo = fetchInfo,
+      quota = quota)
     assertEquals("Given two partitions, with both non-throttled, we should get 
both messages", 1,
       fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
     assertEquals("Given two partitions, with both non-throttled, we should get 
both messages", 1,
       fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
   }
 
-  def setUpMocks(fetchInfo: Seq[(TopicAndPartition, PartitionFetchInfo)], 
message: Message = this.message) {
+  @Test
+  def shouldIncludeInSyncThrottledReplicas(): Unit = {
+    setUpMocks(fetchInfo, bothReplicasInSync = true)
+    val followerReplicaId = configs.last.brokerId
+
+    val quota = mockQuota(1000000)
+    expect(quota.isQuotaExceeded()).andReturn(false).once()
+    expect(quota.isQuotaExceeded()).andReturn(true).once()
+    replay(quota)
+
+    val fetch = replicaManager.readFromLocalLog(
+      replicaId = followerReplicaId,
+      fetchOnlyFromLeader = true,
+      readOnlyCommitted = true,
+      fetchMaxBytes = Int.MaxValue,
+      hardMaxBytesLimit = false,
+      readPartitionInfo = fetchInfo,
+      quota = quota)
+    assertEquals("Given two partitions, with only one throttled, we should get 
the first", 1,
+      fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
+
+    assertEquals("But we should get the second too since it's throttled but in 
sync", 1,
+      fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
+  }
+
+  def setUpMocks(fetchInfo: Seq[(TopicAndPartition, PartitionFetchInfo)], 
message: Message = this.message, bothReplicasInSync: Boolean = false) {
     val zkUtils = createNiceMock(classOf[ZkUtils])
     val scheduler = createNiceMock(classOf[KafkaScheduler])
 
@@ -131,12 +180,19 @@ class ReplicaManagerQuotasTest {
     //create the two replicas
     for ((p, _) <- fetchInfo) {
       val partition = replicaManager.getOrCreatePartition(p.topic, p.partition)
-      val replica = new Replica(configs.head.brokerId, partition, time, 0, 
Some(log))
-      replica.highWatermark = new LogOffsetMetadata(5)
-      partition.leaderReplicaIdOpt = Some(replica.brokerId)
-      val allReplicas = List(replica)
+      val leaderReplica = new Replica(configs.head.brokerId, partition, time, 
0, Some(log))
+      leaderReplica.highWatermark = new LogOffsetMetadata(5)
+      partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId)
+      val followerReplica = new Replica(configs.last.brokerId, partition, 
time, 0, Some(log))
+      val allReplicas = Set(leaderReplica, followerReplica)
       allReplicas.foreach(partition.addReplicaIfNotExists(_))
-      partition.inSyncReplicas = allReplicas.toSet
+      if (bothReplicasInSync) {
+        partition.inSyncReplicas = allReplicas
+        followerReplica.highWatermark = new LogOffsetMetadata(5)
+      } else {
+        partition.inSyncReplicas = Set(leaderReplica)
+        followerReplica.highWatermark = new LogOffsetMetadata(0)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c4f1d54/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 71c2b41..cbd751b 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -149,9 +149,23 @@ class SimpleFetchTest {
     val initialAllTopicsCount = 
BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()
 
     assertEquals("Reading committed data should return messages only up to 
high watermark", messagesToHW,
-      replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, 
fetchInfo, UnboundedQuota).find(_._1 == 
topicAndPartition).get._2.info.messageSet.head.message)
+      replicaManager.readFromLocalLog(
+        replicaId = Request.OrdinaryConsumerId,
+        fetchOnlyFromLeader = true,
+        readOnlyCommitted = true,
+        fetchMaxBytes = Int.MaxValue,
+        hardMaxBytesLimit = false,
+        readPartitionInfo = fetchInfo,
+        quota = UnboundedQuota).find(_._1 == 
topicAndPartition).get._2.info.messageSet.head.message)
     assertEquals("Reading any data can return messages up to the end of the 
log", messagesToLEO,
-      replicaManager.readFromLocalLog(true, false, Int.MaxValue, false, 
fetchInfo, UnboundedQuota).find(_._1 == 
topicAndPartition).get._2.info.messageSet.head.message)
+      replicaManager.readFromLocalLog(
+        replicaId = Request.OrdinaryConsumerId,
+        fetchOnlyFromLeader = true,
+        readOnlyCommitted = false,
+        fetchMaxBytes = Int.MaxValue,
+        hardMaxBytesLimit = false,
+        readPartitionInfo = fetchInfo,
+        quota = UnboundedQuota).find(_._1 == 
topicAndPartition).get._2.info.messageSet.head.message)
 
     assertEquals("Counts should increment after fetch", initialTopicCount+2, 
BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count())
     assertEquals("Counts should increment after fetch", 
initialAllTopicsCount+2, 
BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())

Reply via email to