This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f909e1c87fe3b4f3068f97db3fafb82de223f6f7
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Jan 15 10:39:11 2026 +0200

    [fix][proxy] Fix memory leaks in ParserProxyHandler (#25142)
    
    (cherry picked from commit 99fdca8af6742dd36bec93ab0038fb572e6eeb0e)
---
 .../pulsar/proxy/server/DirectProxyHandler.java    |   9 +-
 .../pulsar/proxy/server/ParserProxyHandler.java    | 126 ++++++++++++++-------
 .../apache/pulsar/proxy/server/ProxyService.java   |  12 +-
 .../apache/pulsar/proxy/server/ProxyStatsTest.java |  58 ++++++++++
 4 files changed, 155 insertions(+), 50 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 5f4456d356e..f707abbc06a 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -426,26 +426,27 @@ public class DirectProxyHandler {
             } else {
                 // Enable parsing feature, proxyLogLevel(1 or 2)
                 // Add parser handler
+                ParserProxyHandler.Context parserContext = 
ParserProxyHandler.createContext();
                 if (connected.hasMaxMessageSize()) {
                     
FrameDecoderUtil.replaceFrameDecoder(inboundChannel.pipeline(),
                             connected.getMaxMessageSize());
                     
FrameDecoderUtil.replaceFrameDecoder(outboundChannel.pipeline(),
                             connected.getMaxMessageSize());
                     inboundChannel.pipeline().addBefore("handler", 
"inboundParser",
-                            new ParserProxyHandler(service,
+                            new ParserProxyHandler(parserContext, service,
                                     ParserProxyHandler.FRONTEND_CONN,
                                     connected.getMaxMessageSize(), 
outboundChannel.id()));
                     
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
-                            new ParserProxyHandler(service,
+                            new ParserProxyHandler(parserContext, service,
                                     ParserProxyHandler.BACKEND_CONN,
                                     connected.getMaxMessageSize(), 
inboundChannel.id()));
                 } else {
                     inboundChannel.pipeline().addBefore("handler", 
"inboundParser",
-                            new ParserProxyHandler(service,
+                            new ParserProxyHandler(parserContext, service,
                                     ParserProxyHandler.FRONTEND_CONN,
                                     Commands.DEFAULT_MAX_MESSAGE_SIZE, 
outboundChannel.id()));
                     
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
-                            new ParserProxyHandler(service,
+                            new ParserProxyHandler(parserContext, service,
                                     ParserProxyHandler.BACKEND_CONN,
                                     Commands.DEFAULT_MAX_MESSAGE_SIZE, 
inboundChannel.id()));
                 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
index 3a98311eb15..d3de3a2ff0a 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -31,11 +31,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import lombok.Getter;
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.raw.MessageParser;
 import org.apache.pulsar.common.api.raw.RawMessage;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.StringInterner;
 import org.apache.pulsar.proxy.stats.TopicStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,27 +55,41 @@ public class ParserProxyHandler extends 
ChannelInboundHandlerAdapter {
 
     private final int maxMessageSize;
     private final ChannelId peerChannelId;
+    @Getter
+    private final Context context;
     private final ProxyService service;
 
 
-    /**
-     * producerid + channelid as key.
-     */
-    private static final Map<String, String> producerHashMap = new 
ConcurrentHashMap<>();
+    public static class Context {
+        /**
+         * producerid as key.
+         */
+        @Getter
+        private final Map<Long, String> producerIdToTopicName = new 
ConcurrentHashMap<>();
 
-    /**
-     * consumerid + channelid as key.
-     */
-    private static final Map<String, String> consumerHashMap = new 
ConcurrentHashMap<>();
+        /**
+         * consumerid as key.
+         */
+        @Getter
+        private final Map<Long, String> consumerIdToTopicName = new 
ConcurrentHashMap<>();
 
-    public ParserProxyHandler(ProxyService service, String type, int 
maxMessageSize,
+        private Context() {
+        }
+    }
+
+    public ParserProxyHandler(Context context, ProxyService service, String 
type, int maxMessageSize,
                               ChannelId peerChannelId) {
+        this.context = context;
         this.service = service;
         this.connType = type;
         this.maxMessageSize = maxMessageSize;
         this.peerChannelId = peerChannelId;
     }
 
+    public static Context createContext() {
+        return new Context();
+    }
+
     private void logging(Channel conn, BaseCommand.Type cmdtype, String info, 
List<RawMessage> messages) {
 
         if (messages != null) {
@@ -115,64 +131,86 @@ public class ParserProxyHandler extends 
ChannelInboundHandlerAdapter {
 
             switch (cmd.getType()) {
                 case PRODUCER:
-                    
ParserProxyHandler.producerHashMap.put(cmd.getProducer().getProducerId() + "," 
+ ctx.channel().id(),
-                            cmd.getProducer().getTopic());
+                    topicName = 
StringInterner.intern(cmd.getProducer().getTopic());
+                    
context.producerIdToTopicName.put(cmd.getProducer().getProducerId(), topicName);
 
                     String producerName = "";
                     if (cmd.getProducer().hasProducerName()){
                         producerName = cmd.getProducer().getProducerName();
                     }
                     logging(ctx.channel(), cmd.getType(), "{producer:" + 
producerName
-                            + ",topic:" + cmd.getProducer().getTopic() + "}", 
null);
+                            + ",topic:" + topicName + "}", null);
+                    break;
+                case CLOSE_PRODUCER:
+                    
context.producerIdToTopicName.remove(cmd.getCloseProducer().getProducerId());
+                    logging(ctx.channel(), cmd.getType(), "", null);
                     break;
-
                 case SEND:
                     if (service.getProxyLogLevel() != 2) {
                         logging(ctx.channel(), cmd.getType(), "", null);
                         break;
                     }
-                    topicName = 
TopicName.toFullTopicName(ParserProxyHandler.producerHashMap.get(
-                            cmd.getSend().getProducerId() + "," + 
ctx.channel().id()));
-                    MutableLong msgBytes = new MutableLong(0);
-                    MessageParser.parseMessage(topicName, -1L,
-                            -1L, buffer, (message) -> {
-                                messages.add(message);
-                                
msgBytes.add(message.getData().readableBytes());
-                            }, maxMessageSize);
-                    // update topic stats
-                    TopicStats topicStats = 
this.service.getTopicStats().computeIfAbsent(topicName,
-                        topic -> new TopicStats());
-                    
topicStats.getMsgInRate().recordMultipleEvents(messages.size(), 
msgBytes.longValue());
-                    logging(ctx.channel(), cmd.getType(), "", messages);
+                    long producerId = cmd.getSend().getProducerId();
+                    String topicForProducer = 
context.producerIdToTopicName.get(producerId);
+                    if (topicForProducer != null) {
+                        topicName = 
TopicName.toFullTopicName(topicForProducer);
+                        MutableLong msgBytes = new MutableLong(0);
+                        MessageParser.parseMessage(topicName, -1L,
+                                -1L, buffer, (message) -> {
+                                    messages.add(message);
+                                    
msgBytes.add(message.getData().readableBytes());
+                                }, maxMessageSize);
+                        // update topic stats
+                        TopicStats topicStats = 
this.service.getTopicStats().computeIfAbsent(topicName,
+                                topic -> new TopicStats());
+                        
topicStats.getMsgInRate().recordMultipleEvents(messages.size(), 
msgBytes.longValue());
+                        logging(ctx.channel(), cmd.getType(), "", messages);
+                    } else {
+                        logging(ctx.channel(), cmd.getType(),
+                                "Cannot find topic name for producerId " + 
producerId, null);
+                    }
                     break;
 
                 case SUBSCRIBE:
-                    
ParserProxyHandler.consumerHashMap.put(cmd.getSubscribe().getConsumerId() + ","
-                                    + ctx.channel().id(), 
cmd.getSubscribe().getTopic());
+                    topicName = 
StringInterner.intern(cmd.getSubscribe().getTopic());
+                    
context.consumerIdToTopicName.put(cmd.getSubscribe().getConsumerId(), 
topicName);
 
                     logging(ctx.channel(), cmd.getType(), "{consumer:" + 
cmd.getSubscribe().getConsumerName()
-                            + ",topic:" + cmd.getSubscribe().getTopic() + "}", 
null);
+                            + ",topic:" + topicName + "}", null);
+                    break;
+                case CLOSE_CONSUMER:
+                    
context.consumerIdToTopicName.remove(cmd.getCloseConsumer().getConsumerId());
+                    logging(ctx.channel(), cmd.getType(), "", null);
+                    break;
+                case UNSUBSCRIBE:
+                    
context.consumerIdToTopicName.remove(cmd.getUnsubscribe().getConsumerId());
+                    logging(ctx.channel(), cmd.getType(), "", null);
                     break;
-
                 case MESSAGE:
                     if (service.getProxyLogLevel() != 2) {
                         logging(ctx.channel(), cmd.getType(), "", null);
                         break;
                     }
-                    topicName = 
TopicName.toFullTopicName(ParserProxyHandler.consumerHashMap.get(
-                            cmd.getMessage().getConsumerId() + "," + 
peerChannelId));
-
-                    msgBytes = new MutableLong(0);
-                    MessageParser.parseMessage(topicName, -1L,
-                            -1L, buffer, (message) -> {
-                                messages.add(message);
-                                
msgBytes.add(message.getData().readableBytes());
-                            }, maxMessageSize);
-                    // update topic stats
-                    topicStats = 
this.service.getTopicStats().computeIfAbsent(topicName.toString(),
-                            topic -> new TopicStats());
-                    
topicStats.getMsgOutRate().recordMultipleEvents(messages.size(), 
msgBytes.longValue());
-                    logging(ctx.channel(), cmd.getType(), "", messages);
+                    long consumerId = cmd.getMessage().getConsumerId();
+                    String topicForConsumer = 
context.consumerIdToTopicName.get(consumerId);
+                    if (topicForConsumer != null) {
+                        topicName = 
TopicName.toFullTopicName(topicForConsumer);
+
+                        MutableLong msgBytes = new MutableLong(0);
+                        MessageParser.parseMessage(topicName, -1L,
+                                -1L, buffer, (message) -> {
+                                    messages.add(message);
+                                    
msgBytes.add(message.getData().readableBytes());
+                                }, maxMessageSize);
+                        // update topic stats
+                        TopicStats topicStats = 
this.service.getTopicStats().computeIfAbsent(topicName.toString(),
+                                topic -> new TopicStats());
+                        
topicStats.getMsgOutRate().recordMultipleEvents(messages.size(), 
msgBytes.longValue());
+                        logging(ctx.channel(), cmd.getType(), "", messages);
+                    } else {
+                        logging(ctx.channel(), cmd.getType(), "Cannot find 
topic name for consumerId " + consumerId,
+                                null);
+                    }
                     break;
 
                  default:
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 251e25a0e6b..670ac08dee9 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -55,7 +55,6 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import lombok.Getter;
-import lombok.Setter;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
@@ -116,7 +115,6 @@ public class ProxyService implements Closeable {
     protected final AtomicReference<Semaphore> lookupRequestSemaphore;
 
     @Getter
-    @Setter
     protected int proxyLogLevel;
 
     @Getter
@@ -590,4 +588,14 @@ public class ProxyService implements Closeable {
     public void setGracefulShutdown(boolean gracefulShutdown) {
         this.gracefulShutdown = gracefulShutdown;
     }
+
+    public void setProxyLogLevel(int proxyLogLevel) {
+        this.proxyLogLevel = proxyLogLevel;
+        // clear the topic stats when proxy log level is changed to < 2
+        // this is a way to avoid the proxy consuming too much memory when 
there are a lot of topics and log level
+        // has been temporarily set to 2
+        if (proxyLogLevel < 2) {
+            topicStats.clear();
+        }
+    }
 }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index 17dac99d632..b6a0758b528 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -21,7 +21,9 @@ package org.apache.pulsar.proxy.server;
 import static java.util.Objects.requireNonNull;
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import java.util.List;
@@ -214,6 +216,62 @@ public class ProxyStatsTest extends 
MockedPulsarServiceBaseTest {
 
         consumer.close();
         consumer2.close();
+
+        // check that topic stats are cleared after setting proxy log level to 0
+        assertFalse(proxyService.getTopicStats().isEmpty());
+        proxyService.setProxyLogLevel(0);
+        assertTrue(proxyService.getTopicStats().isEmpty());
+    }
+
+    @Test
+    public void testMemoryLeakFixed() throws Exception {
+        proxyService.setProxyLogLevel(2);
+        final String topicName = "persistent://sample/test/local/topic-stats";
+        final String topicName2 = 
"persistent://sample/test/local/topic-stats-2";
+
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()).build();
+        Producer<byte[]> producer1 = 
client.newProducer(Schema.BYTES).topic(topicName).enableBatching(false)
+                
.producerName("producer1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+        Producer<byte[]> producer2 = 
client.newProducer(Schema.BYTES).topic(topicName2).enableBatching(false)
+                
.producerName("producer2").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+        Consumer<byte[]> consumer = 
client.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
+        Consumer<byte[]> consumer2 = 
client.newConsumer().topic(topicName2).subscriptionName("my-sub")
+                .subscribe();
+
+        int totalMessages = 10;
+        for (int i = 0; i < totalMessages; i++) {
+            producer1.send("test".getBytes());
+            producer2.send("test".getBytes());
+        }
+
+        for (int i = 0; i < totalMessages; i++) {
+            Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+            requireNonNull(msg);
+            consumer.acknowledge(msg);
+            msg = consumer2.receive(1, TimeUnit.SECONDS);
+        }
+
+        ParserProxyHandler.Context context = 
proxyService.getClientCnxs().stream().map(proxyConnection -> {
+            ParserProxyHandler parserProxyHandler = 
proxyConnection.ctx().pipeline().get(ParserProxyHandler.class);
+            return parserProxyHandler != null ? 
parserProxyHandler.getContext() : null;
+        }).filter(c -> c != null && 
!c.getConsumerIdToTopicName().isEmpty()).findFirst().get();
+
+        assertEquals(context.getConsumerIdToTopicName().size(), 2);
+        assertEquals(context.getProducerIdToTopicName().size(), 2);
+
+
+        consumer.close();
+        assertEquals(context.getConsumerIdToTopicName().size(), 1);
+        consumer2.close();
+        assertEquals(context.getConsumerIdToTopicName().size(), 0);
+
+        producer1.close();
+        assertEquals(context.getProducerIdToTopicName().size(), 1);
+        producer2.close();
+        assertEquals(context.getProducerIdToTopicName().size(), 0);
     }
 
     /**

Reply via email to