dajac commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r839245649



##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +218,115 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    testResizeThreadPool(10, 50)
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    testResizeThreadPool(50, 10)
+  }
+
+  private def testResizeThreadPool(currentFetcherSize: Int, newFetcherSize: 
Int, brokerNum: Int = 6): Unit = {
+    val fetchingTopicPartitions = makeTopicPartition(10, 100)
+    val failedTopicPartitions = makeTopicPartition(2, 5, "topic_failed")
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", currentFetcherSize) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker, failedPartitions)
+      }
+    }
+    try {
+      fetcherManager.addFetcherForPartitions(fetchingTopicPartitions.map { tp 
=>
+        val brokerId = getBrokerId(tp, brokerNum)
+        val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)
+        tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+      }.toMap)
+
+      // Mark some of these partitions failed within resizing scope
+      
fetchingTopicPartitions.take(20).foreach(fetcherManager.addFailedPartition)
+      // Mark failed partitions out of resizing scope
+      failedTopicPartitions.foreach(fetcherManager.addFailedPartition)
+
+      fetcherManager.resizeThreadPool(newFetcherSize)
+
+      val ownedPartitions = mutable.Set.empty[TopicPartition]
+      fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
+        val fetcherId = brokerIdAndFetcherId.fetcherId
+        val brokerId = brokerIdAndFetcherId.brokerId
+
+        fetcherThread.partitions.foreach { tp =>
+          ownedPartitions += tp
+          assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+          assertEquals(getBrokerId(tp, brokerNum), brokerId)
+        }
+      }
+      // Verify that all partitions are owned by the fetcher threads.
+      assertEquals(fetchingTopicPartitions, ownedPartitions)
+
+      val failedPartitionsAfterResize = 
fetcherManager.failedPartitions.failedPartitions()
+      // Verify that failed partitions within resizing scope are removed, 
otherwise retained

Review comment:
       For what is worth, I think that we could just remove the first 
assertion. If `failedPartitionsAfterResize` is equal to 
`failedTopicPartitions`, we know that it does not contain any 
`fetchingTopicPartitions`.

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +218,115 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    testResizeThreadPool(10, 50)
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    testResizeThreadPool(50, 10)
+  }
+
+  private def testResizeThreadPool(currentFetcherSize: Int, newFetcherSize: 
Int, brokerNum: Int = 6): Unit = {
+    val fetchingTopicPartitions = makeTopicPartition(10, 100)
+    val failedTopicPartitions = makeTopicPartition(2, 5, "topic_failed")
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", currentFetcherSize) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker, failedPartitions)
+      }
+    }
+    try {
+      fetcherManager.addFetcherForPartitions(fetchingTopicPartitions.map { tp 
=>
+        val brokerId = getBrokerId(tp, brokerNum)
+        val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)
+        tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+      }.toMap)
+
+      // Mark some of these partitions failed within resizing scope
+      
fetchingTopicPartitions.take(20).foreach(fetcherManager.addFailedPartition)
+      // Mark failed partitions out of resizing scope
+      failedTopicPartitions.foreach(fetcherManager.addFailedPartition)
+
+      fetcherManager.resizeThreadPool(newFetcherSize)
+
+      val ownedPartitions = mutable.Set.empty[TopicPartition]
+      fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
+        val fetcherId = brokerIdAndFetcherId.fetcherId
+        val brokerId = brokerIdAndFetcherId.brokerId
+
+        fetcherThread.partitions.foreach { tp =>
+          ownedPartitions += tp
+          assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+          assertEquals(getBrokerId(tp, brokerNum), brokerId)
+        }
+      }
+      // Verify that all partitions are owned by the fetcher threads.
+      assertEquals(fetchingTopicPartitions, ownedPartitions)
+
+      val failedPartitionsAfterResize = 
fetcherManager.failedPartitions.failedPartitions()
+      // Verify that failed partitions within resizing scope are removed, 
otherwise retained

Review comment:
       For what it is worth, I think that we could just remove the first 
assertion. If `failedPartitionsAfterResize` is equal to 
`failedTopicPartitions`, we know that it does not contain any 
`fetchingTopicPartitions`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to