kamalcph commented on code in PR #20045: URL: https://github.com/apache/kafka/pull/20045#discussion_r2182287587
########## core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala: ########## @@ -235,7 +471,8 @@ class DelayedRemoteFetchTest { highWatermark: Int = 0, leaderLogStartOffset: Int = 0): LogReadResult = { new LogReadResult( - new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), + new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY, false, Optional.empty(), + Optional.of(mock(classOf[RemoteStorageFetchInfo]))), Review Comment: Thanks for making this change! Previously, it was reading the results from `localReadResults`. ########## core/src/main/scala/kafka/server/DelayedRemoteFetch.scala: ########## @@ -81,17 +82,23 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void], return forceComplete() } } - if (remoteFetchResult.isDone) // Case c - forceComplete() - else + // Case c + if (remoteFetchResults.values().stream().anyMatch(taskResult => !taskResult.isDone)) Review Comment: nit: subjective Can the condition be inverted? ``` // Case c if (remoteFetchResults.values().stream().allMatch(taskResult => taskResult.isDone)) forceComplete() else false ``` ########## core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala: ########## @@ -170,52 +206,252 @@ class DelayedRemoteFetchTest { @Test def testRequestExpiry(): Unit = { - var actualTopicPartition: Option[TopicIdPartition] = None - var fetchResultOpt: Option[FetchPartitionData] = None + val responses = mutable.Map[TopicIdPartition, FetchPartitionData]() - def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { - assertEquals(1, responses.size) - actualTopicPartition = Some(responses.head._1) - fetchResultOpt = Some(responses.head._2) + def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + responseSeq.foreach { case (tp, data) => + responses.put(tp, data) + } } + def expiresPerSecValue(): Double = { + val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + val metric = allMetrics.find { case (n, _) => n.getMBeanName.endsWith("kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec") } + + if (metric.isEmpty) + 0 + else + metric.get._2.asInstanceOf[Meter].count + } + + val remoteFetchTaskExpired = mock(classOf[Future[Void]]) + val remoteFetchTask2 = mock(classOf[Future[Void]]) + // complete the 2nd task, and keep the 1st one expired + when(remoteFetchTask2.isDone).thenReturn(true) + + // Create futures - one completed, one not + val future1: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() + val future2: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() + // Only complete one remote fetch + future2.complete(buildRemoteReadResult(Errors.NONE)) + + val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) + val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, null, null) + val highWatermark = 100 val leaderLogStartOffset = 10 - val remoteFetchTask = mock(classOf[Future[Void]]) - val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null) - val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) - - val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, remoteFetchMaxWaitMs, - Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val logReadInfo1 = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) + val logReadInfo2 = buildReadResult(Errors.NONE) + + val fetchStatus1 = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val fetchStatus2 = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100), + fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100, logStartOffset, maxBytes, currentLeaderEpoch)) + + // Set up maps for multiple partitions + val remoteFetchTasks = new java.util.HashMap[TopicIdPartition, Future[Void]]() + val remoteFetchResults = new java.util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]() + val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]() + + remoteFetchTasks.put(topicIdPartition, remoteFetchTaskExpired) + remoteFetchTasks.put(topicIdPartition2, remoteFetchTask2) + remoteFetchResults.put(topicIdPartition, future1) + remoteFetchResults.put(topicIdPartition2, future2) + remoteFetchInfos.put(topicIdPartition, fetchInfo1) + remoteFetchInfos.put(topicIdPartition2, fetchInfo2) + + val delayedRemoteFetch = new DelayedRemoteFetch( + remoteFetchTasks, + remoteFetchResults, + remoteFetchInfos, + remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2), + fetchParams, + Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2), + replicaManager, + callback) when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) .thenReturn(mock(classOf[Partition])) + when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition)) + .thenReturn(mock(classOf[Partition])) // Verify that the ExpiresPerSec metric is zero before fetching - val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics - assertEquals(0, metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec")) + val existingMetricVal = expiresPerSecValue() + // Verify the delayedRemoteFetch is not completed yet + assertFalse(delayedRemoteFetch.isCompleted) // Force the delayed remote fetch to expire delayedRemoteFetch.run() - // Check that the task was cancelled and force-completed - verify(remoteFetchTask).cancel(false) + // Check that the expired task was cancelled and force-completed + verify(remoteFetchTaskExpired).cancel(anyBoolean()) + verify(remoteFetchTask2, never()).cancel(anyBoolean()) assertTrue(delayedRemoteFetch.isCompleted) // Check that the ExpiresPerSec metric was incremented - assertEquals(1, metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec")) + assertTrue(expiresPerSecValue() > existingMetricVal) - // Fetch results should still include local read results - assertTrue(actualTopicPartition.isDefined) - assertEquals(topicIdPartition, actualTopicPartition.get) - assertTrue(fetchResultOpt.isDefined) + // Fetch results should include 2 results and the expired one should return local read results + assertEquals(2, responses.size) + assertTrue(responses.contains(topicIdPartition)) + assertTrue(responses.contains(topicIdPartition2)) - val fetchResult = fetchResultOpt.get - assertEquals(Errors.NONE, fetchResult.error) - assertEquals(highWatermark, fetchResult.highWatermark) - assertEquals(leaderLogStartOffset, fetchResult.logStartOffset) + assertEquals(Errors.NONE, responses(topicIdPartition).error) + assertEquals(highWatermark, responses(topicIdPartition).highWatermark) + assertEquals(leaderLogStartOffset, responses(topicIdPartition).logStartOffset) + + assertEquals(Errors.NONE, responses(topicIdPartition2).error) + } + + @Test + def testMultiplePartitions(): Unit = { + val responses = mutable.Map[TopicIdPartition, FetchPartitionData]() + + def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + responseSeq.foreach { case (tp, data) => + responses.put(tp, data) + } + } + + // Create futures - one completed, one not + val future1: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() + val future2: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() + // Only complete one remote fetch + future1.complete(buildRemoteReadResult(Errors.NONE)) + + val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) + val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, null, null) + + val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10) + val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20) + + val fetchStatus1 = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val fetchStatus2 = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100), + fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100, logStartOffset, maxBytes, currentLeaderEpoch)) + + // Set up maps for multiple partitions + val remoteFetchResults = new java.util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]() + val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]() + + remoteFetchResults.put(topicIdPartition, future1) + remoteFetchResults.put(topicIdPartition2, future2) + remoteFetchInfos.put(topicIdPartition, fetchInfo1) + remoteFetchInfos.put(topicIdPartition2, fetchInfo2) + + val delayedRemoteFetch = new DelayedRemoteFetch( + Collections.emptyMap[TopicIdPartition, Future[Void]](), + remoteFetchResults, + remoteFetchInfos, + remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2), + fetchParams, + Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2), + replicaManager, + callback) + + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) + .thenReturn(mock(classOf[Partition])) + when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition)) + .thenReturn(mock(classOf[Partition])) + + // Should not complete since future2 is not done + assertFalse(delayedRemoteFetch.tryComplete()) + assertFalse(delayedRemoteFetch.isCompleted) + + // Complete the other future + future2.complete(buildRemoteReadResult(Errors.NONE)) + + // Now it should complete + assertTrue(delayedRemoteFetch.tryComplete()) + assertTrue(delayedRemoteFetch.isCompleted) + + // Verify both partitions were processed without error + assertEquals(2, responses.size) + assertTrue(responses.contains(topicIdPartition)) + assertTrue(responses.contains(topicIdPartition2)) + + assertEquals(Errors.NONE, responses(topicIdPartition).error) + assertEquals(Errors.NONE, responses(topicIdPartition2).error) Review Comment: can the highWatermark and logStartOffset also be asserted? It should also be filled in the response. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1598,31 +1601,40 @@ class ReplicaManager(val config: KafkaConfig, }) } catch { case e: RejectedExecutionException => - // Return the error if any in scheduling the remote fetch task - warn("Unable to fetch data from remote storage", e) - return Some(createLogReadResult(e)) + warn(s"Unable to fetch data from remote storage for remoteFetchInfo: $remoteFetchInfo", e) + // Store the error in RemoteLogReadResult if any in scheduling the remote fetch task. + // It will be sent back to the client in DelayedRemoteFetch along with other successful remote fetch results. + remoteFetchResult.complete(new RemoteLogReadResult(Optional.empty, Optional.of(e))) + delayedRemoteFetchPurgatory.checkAndComplete(key) Review Comment: DelayedRemoteFetch is not instantiated at this place. Calling the purgatory#checkAndComplete might complete the existing delayed operation in the purgatory (which is fine). Could you explain why do we need this change? ``` delayedRemoteFetchPurgatory.checkAndComplete(key) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org