[ 
https://issues.apache.org/jira/browse/KAFKA-6937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491384#comment-16491384
 ] 

ASF GitHub Bot commented on KAFKA-6937:
---------------------------------------

junrao closed pull request #5074: KAFKA-6937: In-sync replica delayed during 
fetch if replica throttle …
URL: https://github.com/apache/kafka/pull/5074
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala 
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index e478053792d..49bea5a6c1b 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -77,7 +77,6 @@ class DelayedFetch(delayMs: Long,
    */
   override def tryComplete() : Boolean = {
     var accumulatedSize = 0
-    var accumulatedThrottledSize = 0
     fetchMetadata.fetchPartitionStatus.foreach {
       case (topicPartition, fetchStatus) =>
         val fetchOffset = fetchStatus.startOffsetMetadata
@@ -110,9 +109,7 @@ class DelayedFetch(delayMs: Long,
               } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
                 // we take the partition fetch size as upper bound when 
accumulating the bytes (skip if a throttled partition)
                 val bytesAvailable = 
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
-                if (quota.isThrottled(topicPartition))
-                  accumulatedThrottledSize += bytesAvailable
-                else
+                if (!replicaManager.shouldLeaderThrottle(quota, 
topicPartition, fetchMetadata.replicaId))
                   accumulatedSize += bytesAvailable
               }
             }
@@ -131,9 +128,8 @@ class DelayedFetch(delayMs: Long,
     }
 
     // Case D
-    if (accumulatedSize >= fetchMetadata.fetchMinBytes
-      || ((accumulatedSize + accumulatedThrottledSize) >= 
fetchMetadata.fetchMinBytes && !quota.isQuotaExceeded()))
-      forceComplete()
+    if (accumulatedSize >= fetchMetadata.fetchMinBytes)
+       forceComplete()
     else
       false
   }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5532bf1e9a8..8c17a82089d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -666,6 +666,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     override def remove() = throw new UnsupportedOperationException()
   }
 
+  // Traffic from both in-sync and out of sync replicas are accounted for in 
replication quota to ensure total replication
+  // traffic doesn't exceed quota.
   private def sizeOfThrottledPartitions(versionId: Short,
                                         unconvertedResponse: FetchResponse,
                                         quota: ReplicationQuotaManager): Int = 
{
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 6805d77d47f..68fa8734b78 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -126,6 +126,9 @@ class ReplicaFetcherThread(name: String,
     replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
     if (isTraceEnabled)
       trace(s"Follower set replica high watermark for partition 
$topicPartition to $followerHighWatermark")
+
+    // Traffic from both in-sync and out of sync replicas are accounted for in 
replication quota to ensure total replication
+    // traffic doesn't exceed quota.
     if (quota.isThrottled(topicPartition))
       quota.record(records.sizeInBytes)
     replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 334321ae230..c6efca573de 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -148,6 +148,38 @@ class ReplicaManagerQuotasTest {
       fetch.find(_._1 == 
topicPartition2).get._2.info.records.batches.asScala.size)
   }
 
+  @Test
+  def testCompleteInDelayedFetchWithReplicaThrottling(): Unit = {
+    // Set up DelayedFetch where there is data to return to a follower 
replica, either in-sync or out of sync
+    def setupDelayedFetch(isReplicaInSync: Boolean): DelayedFetch = {
+      val logOffsetMetadata = new LogOffsetMetadata(messageOffset = 100L, 
segmentBaseOffset = 0L, relativePositionInSegment = 500)
+      val replica = EasyMock.createMock(classOf[Replica])
+      
EasyMock.expect(replica.logEndOffset).andReturn(logOffsetMetadata).anyTimes()
+      EasyMock.replay(replica)
+
+      val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
+      
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(EasyMock.anyObject[TopicPartition])).andReturn(replica).anyTimes()
+      
EasyMock.expect(replicaManager.shouldLeaderThrottle(EasyMock.anyObject[ReplicaQuota],
 EasyMock.anyObject[TopicPartition], EasyMock.anyObject[Int]))
+        .andReturn(!isReplicaInSync).anyTimes()
+      EasyMock.replay(replicaManager)
+
+      val tp = new TopicPartition("t1", 0)
+      val fetchParititonStatus = new FetchPartitionStatus(new 
LogOffsetMetadata(messageOffset = 50L, segmentBaseOffset = 0L,
+        relativePositionInSegment = 250), new PartitionData(50, 0, 1))
+      val fetchMetadata = new FetchMetadata(fetchMinBytes = 1, fetchMaxBytes = 
1000, hardMaxBytesLimit = true, fetchOnlyLeader = true,
+        fetchOnlyCommitted = false, isFromFollower = true, replicaId = 1, 
fetchPartitionStatus = List((tp, fetchParititonStatus)))
+      new DelayedFetch(delayMs = 600, fetchMetadata = fetchMetadata, 
replicaManager = replicaManager,
+        quota = null, isolationLevel = IsolationLevel.READ_UNCOMMITTED, 
responseCallback = null) {
+        override def forceComplete(): Boolean = {
+          true
+        }
+      }
+    }
+
+    assertTrue("In sync replica should complete", 
setupDelayedFetch(isReplicaInSync = true).tryComplete())
+    assertFalse("Out of sync replica should not complete", 
setupDelayedFetch(isReplicaInSync = false).tryComplete())
+  }
+
   def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: 
SimpleRecord = this.record, bothReplicasInSync: Boolean = false) {
     val zkClient = EasyMock.createMock(classOf[KafkaZkClient])
     val scheduler = createNiceMock(classOf[KafkaScheduler])
@@ -208,7 +240,8 @@ class ReplicaManagerQuotasTest {
 
   @After
   def tearDown() {
-    replicaManager.shutdown(false)
+    if (replicaManager != null)
+      replicaManager.shutdown(false)
     metrics.close()
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> In-sync replica delayed during fetch if replica throttle is exceeded
> --------------------------------------------------------------------
>
>                 Key: KAFKA-6937
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6937
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.11.0.1, 1.1.0, 1.0.1
>            Reporter: Jun Rao
>            Assignee: Jun Rao
>            Priority: Major
>
> When replication throttling is enabled, in-sync replica's traffic should 
> never be throttled. However, in DelayedFetch.tryComplete(), we incorrectly 
> delay the completion of an in-sync replica fetch request if replication 
> throttling is engaged. 
> The impact is that the producer may see increased latency if acks = all. The 
> delivery of the message to the consumer may also be delayed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to