jeffkbkim commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1010806354
########## core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala: ########## @@ -1100,6 +1100,64 @@ class ReplicaFetcherThreadTest { assertEquals(Collections.singletonList(tid1p0), fetchRequestBuilder2.removed()) } + @Test + def testFetchResponseWithPartitionData(): Unit = { + verifyLocalFetchCompletionAfterHighWatermarkUpdate(hasHighWatermarkChanged = true) + verifyLocalFetchCompletionAfterHighWatermarkUpdate(hasHighWatermarkChanged = false) + } + + private def verifyLocalFetchCompletionAfterHighWatermarkUpdate(hasHighWatermarkChanged: Boolean): Unit = { + val props = TestUtils.createBrokerConfig(1, "localhost:1234") + val config = KafkaConfig.fromProps(props) + val newHighWatermark = 100L + + val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend]) + when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint) + + val log: UnifiedLog = mock(classOf[UnifiedLog]) + when(log.updateHighWatermark(newHighWatermark)) + .thenReturn(HighWatermarkUpdate(newHighWatermark, hasHighWatermarkChanged)) + + val appendInfo: Option[LogAppendInfo] = Some(mock(classOf[LogAppendInfo])) Review Comment: `val appendInfo = Partition.appendRecordstoFollowerOrFutureReplica()` can only be None when isFuture = true, i.e. in `ReplicaAlterLogDirsThread.processPartitionData()`. removed the test case for it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org