[ 
https://issues.apache.org/jira/browse/KAFKA-19763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18028323#comment-18028323
 ] 

Luke Chen commented on KAFKA-19763:
-----------------------------------

This patch fixes the problem. But it slows down the read throughput because it 
takes time to clone the buffer. There should be other better solutions.

 
{code:java}
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -22,6 +22,7 @@ import kafka.utils.Logging
 import org.apache.kafka.common.TopicIdPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.server.LogReadResult
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.purgatory.DelayedOperation
@@ -108,6 +109,7 @@ class DelayedRemoteFetch(remoteFetchTasks: 
util.Map[TopicIdPartition, Future[Voi
    */
   override def onComplete(): Unit = {
     val fetchPartitionData = localReadResults.map { case (tp, result) =>
+
       val remoteFetchResult = remoteFetchResults.get(tp)
       if (remoteFetchInfos.containsKey(tp)
         && remoteFetchResult.isDone
@@ -121,7 +123,8 @@ class DelayedRemoteFetch(remoteFetchTasks: 
util.Map[TopicIdPartition, Future[Voi
             result.error,
             result.highWatermark,
             result.leaderLogStartOffset,
-            info.records,
+            // clone the record buffer to release the memory
+            
MemoryRecords.readableRecords(info.records.asInstanceOf[MemoryRecords].buffer()),
             Optional.empty(),
             if (result.lastStableOffset.isPresent) 
OptionalLong.of(result.lastStableOffset.getAsLong) else OptionalLong.empty(),
             info.abortedTransactions,
@@ -132,7 +135,8 @@ class DelayedRemoteFetch(remoteFetchTasks: 
util.Map[TopicIdPartition, Future[Voi
         tp -> result.toFetchPartitionData(false)
       }
     }
-
+    // clear the map to avoid memory leak
+    remoteFetchResults.clear()
     responseCallback(fetchPartitionData)
   }
 }{code}

> Parallel remote reads causes memory leak in broker
> --------------------------------------------------
>
>                 Key: KAFKA-19763
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19763
>             Project: Kafka
>          Issue Type: Task
>            Reporter: Kamal Chandraprakash
>            Assignee: Kamal Chandraprakash
>            Priority: Blocker
>             Fix For: 4.2.0
>
>         Attachments: RemoteReadMemoryLeakReproducer.java, Screenshot 
> 2025-10-07 at 8.25.45 PM.png
>
>
> Broker heap memory gets filled up and throws OOM error when remote reads are 
> triggered for multiple partitions within a FETCH request. 
> Steps to reproduce: 
> 1. Start a one node broker and configure LocalTieredStorage as remote 
> storage. 
> 2. Create a topic with 5 partitions. 
> 3. Produce message and ensure that few segments are uploaded to remote.
> 4. Start a consumer to read from those 5 partitions. Seek the offset to 
> beginning for 4 partitions and to end for 1 partition. This is to simulate 
> that the FETCH request read from both remote-log and local-log.
> 5. The broker crashes with the OOM error.
> 6. The DelayedRemoteFetch / RemoteLogReadResult references are being held by 
> the purgatory, so the broker crashes.
> cc [~showuon]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to