This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 425280c39608aabfc028490cb1a44081beb72092
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Jul 2 09:51:16 2025 +0800

    [fix][proxy] Fix proxy OOM by replacing TopicName with a simple conversion 
method (#24465)
    
    (cherry picked from commit ae050b9c44ef0be474cedcf78c4dd6ba31cf6cc8)
---
 .../pulsar/common/api/raw/MessageParser.java       | 19 +++++-
 .../org/apache/pulsar/common/naming/TopicName.java | 75 ++++++++++++++++++++++
 .../apache/pulsar/common/naming/TopicNameTest.java | 34 +++++++++-
 .../pulsar/proxy/server/LookupProxyHandler.java    |  6 +-
 .../pulsar/proxy/server/ParserProxyHandler.java    | 14 ++--
 5 files changed, 133 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
index af516fa7534..0e9aae4603d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
@@ -55,11 +55,17 @@ public class MessageParser {
         void process(RawMessage message) throws IOException;
     }
 
+    @Deprecated
+    public static void parseMessage(TopicName topicName, long ledgerId, long 
entryId, ByteBuf headersAndPayload,
+                                    MessageProcessor processor, int 
maxMessageSize) throws IOException {
+        parseMessage(topicName.toString(), ledgerId, entryId, 
headersAndPayload, processor, maxMessageSize);
+    }
+
     /**
      * Parse a raw Pulsar entry payload and extract all the individual message 
that may be included in the batch. The
      * provided {@link MessageProcessor} will be invoked for each individual 
message.
      */
-    public static void parseMessage(TopicName topicName, long ledgerId, long 
entryId, ByteBuf headersAndPayload,
+    public static void parseMessage(String topicName, long ledgerId, long 
entryId, ByteBuf headersAndPayload,
             MessageProcessor processor, int maxMessageSize) throws IOException 
{
         ByteBuf payload = headersAndPayload;
         ByteBuf uncompressedPayload = null;
@@ -117,7 +123,7 @@ public class MessageParser {
         }
     }
 
-    public static boolean verifyChecksum(TopicName topic, ByteBuf 
headersAndPayload, long ledgerId, long entryId) {
+    public static boolean verifyChecksum(String topic, ByteBuf 
headersAndPayload, long ledgerId, long entryId) {
         if (hasChecksum(headersAndPayload)) {
             int checksum = readChecksum(headersAndPayload);
             int computedChecksum = computeChecksum(headersAndPayload);
@@ -132,7 +138,14 @@ public class MessageParser {
         return true;
     }
 
-    public static ByteBuf uncompressPayloadIfNeeded(TopicName topic, 
MessageMetadata msgMetadata,
+    @Deprecated
+    public static ByteBuf uncompressPayloadIfNeeded(TopicName topicName, 
MessageMetadata msgMetadata,
+                                                    ByteBuf payload, long 
ledgerId, long entryId, int maxMessageSize) {
+        return uncompressPayloadIfNeeded(topicName.toString(), msgMetadata, 
payload, ledgerId, entryId,
+                maxMessageSize);
+    }
+
+    public static ByteBuf uncompressPayloadIfNeeded(String topic, 
MessageMetadata msgMetadata,
             ByteBuf payload, long ledgerId, long entryId, int maxMessageSize) {
         CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(msgMetadata.getCompression());
         int uncompressedSize = msgMetadata.getUncompressedSize();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index aadb9d15af6..30c7da14459 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.common.naming;
 
 import com.google.common.base.Splitter;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -442,4 +443,78 @@ public class TopicName implements ServiceUnitId {
     public boolean isV2() {
         return cluster == null;
     }
+
+    /**
+     * Convert a topic name to a full topic name.
+     * In Pulsar, a full topic name is 
"<domain>://<tenant>/<namespace>/<local-topic>" (v2) or
+     * "<domain>://<tenant>/<cluster>/<namespace>/<local-topic>" (v1). 
However, for convenient, it's allowed for clients
+     * to pass a short topic name with v2 format:
+     * - "<local-topic>", which represents 
"persistent://public/default/<local-topic>"
+     * - "<tenant>/<namespace>/<local-topic>, which represents 
"persistent://<tenant>/<namespace>/<local-topic>"
+     *
+     * @param topic the topic name from client
+     * @return the full topic name.
+     */
+    public static String toFullTopicName(String topic) {
+        final int index = topic.indexOf("://");
+        if (index >= 0) {
+            TopicDomain.getEnum(topic.substring(0, index));
+            final List<String> parts = splitBySlash(topic.substring(index + 
"://".length()), 4);
+            if (parts.size() != 3 && parts.size() != 4) {
+                throw new IllegalArgumentException(topic + " is invalid");
+            }
+            if (parts.size() == 3) {
+                NamespaceName.validateNamespaceName(parts.get(0), 
parts.get(1));
+                if (StringUtils.isBlank(parts.get(2))) {
+                    throw new IllegalArgumentException(topic + " has blank 
local topic");
+                }
+            } else {
+                NamespaceName.validateNamespaceName(parts.get(0), 
parts.get(1), parts.get(2));
+                if (StringUtils.isBlank(parts.get(3))) {
+                    throw new IllegalArgumentException(topic + " has blank 
local topic");
+                }
+            }
+            return topic; // it's a valid full topic name
+        } else {
+            List<String> parts = splitBySlash(topic, 0);
+            if (parts.size() != 1 && parts.size() != 3) {
+                throw new IllegalArgumentException(topic + " is invalid");
+            }
+            if (parts.size() == 1) {
+                if (StringUtils.isBlank(parts.get(0))) {
+                    throw new IllegalArgumentException(topic + " has blank 
local topic");
+                }
+                return "persistent://public/default/" + parts.get(0);
+            } else {
+                NamespaceName.validateNamespaceName(parts.get(0), 
parts.get(1));
+                if (StringUtils.isBlank(parts.get(2))) {
+                    throw new IllegalArgumentException(topic + " has blank 
local topic");
+                }
+                return "persistent://" + topic;
+            }
+        }
+    }
+
+    private static List<String> splitBySlash(String topic, int limit) {
+        final List<String> tokens = new ArrayList<>(3);
+        final int loopCount = (limit <= 0) ? Integer.MAX_VALUE : limit - 1;
+        int beginIndex = 0;
+        for (int i = 0; i < loopCount; i++) {
+            final int endIndex = topic.indexOf('/', beginIndex);
+            if (endIndex < 0) {
+                tokens.add(topic.substring(beginIndex));
+                return tokens;
+            } else if (endIndex > beginIndex) {
+                tokens.add(topic.substring(beginIndex, endIndex));
+            } else {
+                throw new IllegalArgumentException("Invalid topic name " + 
topic);
+            }
+            beginIndex = endIndex + 1;
+        }
+        if (beginIndex >= topic.length()) {
+            throw new IllegalArgumentException("Invalid topic name " + topic);
+        }
+        tokens.add(topic.substring(beginIndex));
+        return tokens;
+    }
 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
index 27eb82d15af..bb4798fca46 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import org.apache.pulsar.common.util.Codec;
@@ -52,9 +53,8 @@ public class TopicNameTest {
 
         
assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic").toString(),
                 "persistent://tenant/cluster/namespace/topic");
-
-        
assertNotEquals(TopicName.get("persistent://tenant/cluster/namespace/topic"),
-            "persistent://tenant/cluster/namespace/topic");
+        
assertEquals(TopicName.toFullTopicName("persistent://tenant/cluster/namespace/topic"),
+                "persistent://tenant/cluster/namespace/topic");
 
         
assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic").getDomain(),
                 TopicDomain.persistent);
@@ -103,6 +103,7 @@ public class TopicNameTest {
         } catch (IllegalArgumentException e) {
             // Ok
         }
+        assertThrows(IllegalArgumentException.class, () -> 
TopicName.toFullTopicName("://tenant.namespace:my-topic"));
 
         try {
             TopicName.get("://tenant.namespace");
@@ -110,6 +111,7 @@ public class TopicNameTest {
         } catch (IllegalArgumentException e) {
             // Ok
         }
+        assertThrows(IllegalArgumentException.class, () -> 
TopicName.toFullTopicName("://tenant.namespace"));
 
         try {
             TopicName.get("invalid://tenant/cluster/namespace/topic");
@@ -117,6 +119,8 @@ public class TopicNameTest {
         } catch (IllegalArgumentException e) {
             // Ok
         }
+        assertThrows(IllegalArgumentException.class,
+                () -> 
TopicName.toFullTopicName("invalid://tenant/cluster/namespace/topic"));
 
         try {
             TopicName.get("tenant/cluster/namespace/topic");
@@ -124,6 +128,7 @@ public class TopicNameTest {
         } catch (IllegalArgumentException e) {
             // Ok
         }
+        assertThrows(IllegalArgumentException.class, () -> 
TopicName.toFullTopicName("tenant/cluster/namespace/topic"));
 
         try {
             TopicName.get("persistent:///cluster/namespace/mydest-1");
@@ -131,6 +136,8 @@ public class TopicNameTest {
         } catch (IllegalArgumentException e) {
             // Ok
         }
+        assertThrows(IllegalArgumentException.class,
+                () -> 
TopicName.toFullTopicName("persistent:///cluster/namespace/mydest-1"));
 
         try {
             TopicName.get("persistent://pulsar//namespace/mydest-1");
@@ -138,6 +145,8 @@ public class TopicNameTest {
         } catch (IllegalArgumentException e) {
             // Ok
         }
+        assertThrows(IllegalArgumentException.class,
+                () -> 
TopicName.toFullTopicName("persistent://pulsar//namespace/mydest-1"));
 
         try {
             TopicName.get("persistent://pulsar/cluster//mydest-1");
@@ -145,6 +154,8 @@ public class TopicNameTest {
         } catch (IllegalArgumentException e) {
             // Ok
         }
+        assertThrows(IllegalArgumentException.class,
+                () -> 
TopicName.toFullTopicName("persistent://pulsar/cluster//mydest-1"));
 
         try {
             TopicName.get("persistent://pulsar/cluster/namespace/");
@@ -152,6 +163,8 @@ public class TopicNameTest {
         } catch (IllegalArgumentException e) {
             // Ok
         }
+        assertThrows(IllegalArgumentException.class,
+                () -> 
TopicName.toFullTopicName("persistent://pulsar/cluster/namespace/"));
 
         try {
             TopicName.get("://pulsar/cluster/namespace/");
@@ -159,6 +172,7 @@ public class TopicNameTest {
         } catch (IllegalArgumentException e) {
             // Ok
         }
+        assertThrows(IllegalArgumentException.class, () -> 
TopicName.toFullTopicName("://pulsar/cluster/namespace/"));
 
         
assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic")
                 .getPersistenceNamingEncoding(), 
"tenant/cluster/namespace/persistent/topic");
@@ -169,6 +183,7 @@ public class TopicNameTest {
         } catch (IllegalArgumentException e) {
             // Ok
         }
+        assertThrows(IllegalArgumentException.class, () -> 
TopicName.toFullTopicName("://tenant.namespace"));
 
         try {
             TopicName.get("://tenant/cluster/namespace");
@@ -176,6 +191,7 @@ public class TopicNameTest {
         } catch (IllegalArgumentException e) {
             // Ok
         }
+        assertThrows(IllegalArgumentException.class, () -> 
TopicName.toFullTopicName("://tenant//cluster/namespace"));
 
         try {
             TopicName.get(" ");
@@ -183,6 +199,7 @@ public class TopicNameTest {
         } catch (IllegalArgumentException e) {
             // Ok
         }
+        assertThrows(IllegalArgumentException.class, () -> 
TopicName.toFullTopicName(" "));
 
         TopicName nameWithSlash = 
TopicName.get("persistent://tenant/cluster/namespace/ns-abc/table/1");
         assertEquals(nameWithSlash.getEncodedLocalName(), 
Codec.encode("ns-abc/table/1"));
@@ -344,4 +361,15 @@ public class TopicNameTest {
         assertNotEquals(tp2.toString(), tp1.toString());
         assertEquals(tp2.toString(), 
"persistent://tenant1/namespace1/tp1-partition-0-DLQ-partition-0");
     }
+
+    @Test
+    public void testToFullTopicName() {
+        // There is no constraint for local topic name
+        assertEquals("persistent://public/default/tp???xx=", 
TopicName.toFullTopicName("tp???xx="));
+        assertEquals("persistent://tenant/ns/tp???xx=", 
TopicName.toFullTopicName("tenant/ns/tp???xx="));
+        assertEquals("persistent://tenant/ns/test", 
TopicName.toFullTopicName("persistent://tenant/ns/test"));
+        assertThrows(IllegalArgumentException.class, () -> 
TopicName.toFullTopicName("ns/topic"));
+        // v1 format is not supported when the domain is not included
+        assertThrows(IllegalArgumentException.class, () -> 
TopicName.toFullTopicName("tenant/cluster/ns/topic"));
+    }
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 75109883f98..c38e2ba08c1 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -221,7 +221,7 @@ public class LookupProxyHandler {
      **/
     private void 
handlePartitionMetadataResponse(CommandPartitionedTopicMetadata 
partitionMetadata,
             long clientRequestId) {
-        TopicName topicName = TopicName.get(partitionMetadata.getTopic());
+        String topicName = 
TopicName.toFullTopicName(partitionMetadata.getTopic());
 
         String serviceUrl = getBrokerServiceUrl(clientRequestId);
         if (serviceUrl == null) {
@@ -235,7 +235,7 @@ public class LookupProxyHandler {
 
         if (log.isDebugEnabled()) {
             log.debug("Getting connections to '{}' for Looking up topic '{}' 
with clientReq Id '{}'", addr,
-                    topicName.getPartitionedTopicName(), clientRequestId);
+                    topicName, clientRequestId);
         }
         
proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> 
{
             // Connected to backend broker
@@ -245,7 +245,7 @@ public class LookupProxyHandler {
                     partitionMetadata.isMetadataAutoCreationEnabled());
             clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
                 if (t != null) {
-                    log.warn("[{}] failed to get Partitioned metadata : {}", 
topicName.toString(),
+                    log.warn("[{}] failed to get Partitioned metadata : {}", 
topicName,
                         t.getMessage(), t);
                     PulsarClientException pce = 
PulsarClientException.unwrap(t);
                     
writeAndFlush(Commands.newLookupErrorResponse(clientCnx.revertClientExToErrorCode(pce),
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
index 22957c9599f..3a98311eb15 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -101,7 +101,8 @@ public class ParserProxyHandler extends 
ChannelInboundHandlerAdapter {
     private final BaseCommand cmd = new BaseCommand();
 
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        TopicName topicName;
+        String key;
+        String topicName;
         List<RawMessage> messages = new ArrayList<>();
         ByteBuf buffer = (ByteBuf) (msg);
 
@@ -130,8 +131,8 @@ public class ParserProxyHandler extends 
ChannelInboundHandlerAdapter {
                         logging(ctx.channel(), cmd.getType(), "", null);
                         break;
                     }
-                    topicName = 
TopicName.get(ParserProxyHandler.producerHashMap.get(cmd.getSend().getProducerId()
 + ","
-                            + ctx.channel().id()));
+                    topicName = 
TopicName.toFullTopicName(ParserProxyHandler.producerHashMap.get(
+                            cmd.getSend().getProducerId() + "," + 
ctx.channel().id()));
                     MutableLong msgBytes = new MutableLong(0);
                     MessageParser.parseMessage(topicName, -1L,
                             -1L, buffer, (message) -> {
@@ -139,7 +140,7 @@ public class ParserProxyHandler extends 
ChannelInboundHandlerAdapter {
                                 
msgBytes.add(message.getData().readableBytes());
                             }, maxMessageSize);
                     // update topic stats
-                    TopicStats topicStats = 
this.service.getTopicStats().computeIfAbsent(topicName.toString(),
+                    TopicStats topicStats = 
this.service.getTopicStats().computeIfAbsent(topicName,
                         topic -> new TopicStats());
                     
topicStats.getMsgInRate().recordMultipleEvents(messages.size(), 
msgBytes.longValue());
                     logging(ctx.channel(), cmd.getType(), "", messages);
@@ -158,8 +159,9 @@ public class ParserProxyHandler extends 
ChannelInboundHandlerAdapter {
                         logging(ctx.channel(), cmd.getType(), "", null);
                         break;
                     }
-                    topicName = 
TopicName.get(ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId()
-                            + "," + peerChannelId));
+                    topicName = 
TopicName.toFullTopicName(ParserProxyHandler.consumerHashMap.get(
+                            cmd.getMessage().getConsumerId() + "," + 
peerChannelId));
+
                     msgBytes = new MutableLong(0);
                     MessageParser.parseMessage(topicName, -1L,
                             -1L, buffer, (message) -> {

Reply via email to