This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 425280c39608aabfc028490cb1a44081beb72092 Author: Yunze Xu <[email protected]> AuthorDate: Wed Jul 2 09:51:16 2025 +0800 [fix][proxy] Fix proxy OOM by replacing TopicName with a simple conversion method (#24465) (cherry picked from commit ae050b9c44ef0be474cedcf78c4dd6ba31cf6cc8) --- .../pulsar/common/api/raw/MessageParser.java | 19 +++++- .../org/apache/pulsar/common/naming/TopicName.java | 75 ++++++++++++++++++++++ .../apache/pulsar/common/naming/TopicNameTest.java | 34 +++++++++- .../pulsar/proxy/server/LookupProxyHandler.java | 6 +- .../pulsar/proxy/server/ParserProxyHandler.java | 14 ++-- 5 files changed, 133 insertions(+), 15 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java index af516fa7534..0e9aae4603d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java @@ -55,11 +55,17 @@ public class MessageParser { void process(RawMessage message) throws IOException; } + @Deprecated + public static void parseMessage(TopicName topicName, long ledgerId, long entryId, ByteBuf headersAndPayload, + MessageProcessor processor, int maxMessageSize) throws IOException { + parseMessage(topicName.toString(), ledgerId, entryId, headersAndPayload, processor, maxMessageSize); + } + /** * Parse a raw Pulsar entry payload and extract all the individual message that may be included in the batch. The * provided {@link MessageProcessor} will be invoked for each individual message. */ - public static void parseMessage(TopicName topicName, long ledgerId, long entryId, ByteBuf headersAndPayload, + public static void parseMessage(String topicName, long ledgerId, long entryId, ByteBuf headersAndPayload, MessageProcessor processor, int maxMessageSize) throws IOException { ByteBuf payload = headersAndPayload; ByteBuf uncompressedPayload = null; @@ -117,7 +123,7 @@ public class MessageParser { } } - public static boolean verifyChecksum(TopicName topic, ByteBuf headersAndPayload, long ledgerId, long entryId) { + public static boolean verifyChecksum(String topic, ByteBuf headersAndPayload, long ledgerId, long entryId) { if (hasChecksum(headersAndPayload)) { int checksum = readChecksum(headersAndPayload); int computedChecksum = computeChecksum(headersAndPayload); @@ -132,7 +138,14 @@ public class MessageParser { return true; } - public static ByteBuf uncompressPayloadIfNeeded(TopicName topic, MessageMetadata msgMetadata, + @Deprecated + public static ByteBuf uncompressPayloadIfNeeded(TopicName topicName, MessageMetadata msgMetadata, + ByteBuf payload, long ledgerId, long entryId, int maxMessageSize) { + return uncompressPayloadIfNeeded(topicName.toString(), msgMetadata, payload, ledgerId, entryId, + maxMessageSize); + } + + public static ByteBuf uncompressPayloadIfNeeded(String topic, MessageMetadata msgMetadata, ByteBuf payload, long ledgerId, long entryId, int maxMessageSize) { CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(msgMetadata.getCompression()); int uncompressedSize = msgMetadata.getUncompressedSize(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index aadb9d15af6..30c7da14459 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -20,6 +20,7 @@ package org.apache.pulsar.common.naming; import com.google.common.base.Splitter; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -442,4 +443,78 @@ public class TopicName implements ServiceUnitId { public boolean isV2() { return cluster == null; } + + /** + * Convert a topic name to a full topic name. + * In Pulsar, a full topic name is "<domain>://<tenant>/<namespace>/<local-topic>" (v2) or + * "<domain>://<tenant>/<cluster>/<namespace>/<local-topic>" (v1). However, for convenient, it's allowed for clients + * to pass a short topic name with v2 format: + * - "<local-topic>", which represents "persistent://public/default/<local-topic>" + * - "<tenant>/<namespace>/<local-topic>, which represents "persistent://<tenant>/<namespace>/<local-topic>" + * + * @param topic the topic name from client + * @return the full topic name. + */ + public static String toFullTopicName(String topic) { + final int index = topic.indexOf("://"); + if (index >= 0) { + TopicDomain.getEnum(topic.substring(0, index)); + final List<String> parts = splitBySlash(topic.substring(index + "://".length()), 4); + if (parts.size() != 3 && parts.size() != 4) { + throw new IllegalArgumentException(topic + " is invalid"); + } + if (parts.size() == 3) { + NamespaceName.validateNamespaceName(parts.get(0), parts.get(1)); + if (StringUtils.isBlank(parts.get(2))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + } else { + NamespaceName.validateNamespaceName(parts.get(0), parts.get(1), parts.get(2)); + if (StringUtils.isBlank(parts.get(3))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + } + return topic; // it's a valid full topic name + } else { + List<String> parts = splitBySlash(topic, 0); + if (parts.size() != 1 && parts.size() != 3) { + throw new IllegalArgumentException(topic + " is invalid"); + } + if (parts.size() == 1) { + if (StringUtils.isBlank(parts.get(0))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + return "persistent://public/default/" + parts.get(0); + } else { + NamespaceName.validateNamespaceName(parts.get(0), parts.get(1)); + if (StringUtils.isBlank(parts.get(2))) { + throw new IllegalArgumentException(topic + " has blank local topic"); + } + return "persistent://" + topic; + } + } + } + + private static List<String> splitBySlash(String topic, int limit) { + final List<String> tokens = new ArrayList<>(3); + final int loopCount = (limit <= 0) ? Integer.MAX_VALUE : limit - 1; + int beginIndex = 0; + for (int i = 0; i < loopCount; i++) { + final int endIndex = topic.indexOf('/', beginIndex); + if (endIndex < 0) { + tokens.add(topic.substring(beginIndex)); + return tokens; + } else if (endIndex > beginIndex) { + tokens.add(topic.substring(beginIndex, endIndex)); + } else { + throw new IllegalArgumentException("Invalid topic name " + topic); + } + beginIndex = endIndex + 1; + } + if (beginIndex >= topic.length()) { + throw new IllegalArgumentException("Invalid topic name " + topic); + } + tokens.add(topic.substring(beginIndex)); + return tokens; + } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java index 27eb82d15af..bb4798fca46 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import org.apache.pulsar.common.util.Codec; @@ -52,9 +53,8 @@ public class TopicNameTest { assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic").toString(), "persistent://tenant/cluster/namespace/topic"); - - assertNotEquals(TopicName.get("persistent://tenant/cluster/namespace/topic"), - "persistent://tenant/cluster/namespace/topic"); + assertEquals(TopicName.toFullTopicName("persistent://tenant/cluster/namespace/topic"), + "persistent://tenant/cluster/namespace/topic"); assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic").getDomain(), TopicDomain.persistent); @@ -103,6 +103,7 @@ public class TopicNameTest { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant.namespace:my-topic")); try { TopicName.get("://tenant.namespace"); @@ -110,6 +111,7 @@ public class TopicNameTest { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant.namespace")); try { TopicName.get("invalid://tenant/cluster/namespace/topic"); @@ -117,6 +119,8 @@ public class TopicNameTest { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicName.toFullTopicName("invalid://tenant/cluster/namespace/topic")); try { TopicName.get("tenant/cluster/namespace/topic"); @@ -124,6 +128,7 @@ public class TopicNameTest { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("tenant/cluster/namespace/topic")); try { TopicName.get("persistent:///cluster/namespace/mydest-1"); @@ -131,6 +136,8 @@ public class TopicNameTest { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicName.toFullTopicName("persistent:///cluster/namespace/mydest-1")); try { TopicName.get("persistent://pulsar//namespace/mydest-1"); @@ -138,6 +145,8 @@ public class TopicNameTest { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicName.toFullTopicName("persistent://pulsar//namespace/mydest-1")); try { TopicName.get("persistent://pulsar/cluster//mydest-1"); @@ -145,6 +154,8 @@ public class TopicNameTest { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicName.toFullTopicName("persistent://pulsar/cluster//mydest-1")); try { TopicName.get("persistent://pulsar/cluster/namespace/"); @@ -152,6 +163,8 @@ public class TopicNameTest { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, + () -> TopicName.toFullTopicName("persistent://pulsar/cluster/namespace/")); try { TopicName.get("://pulsar/cluster/namespace/"); @@ -159,6 +172,7 @@ public class TopicNameTest { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://pulsar/cluster/namespace/")); assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic") .getPersistenceNamingEncoding(), "tenant/cluster/namespace/persistent/topic"); @@ -169,6 +183,7 @@ public class TopicNameTest { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant.namespace")); try { TopicName.get("://tenant/cluster/namespace"); @@ -176,6 +191,7 @@ public class TopicNameTest { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant//cluster/namespace")); try { TopicName.get(" "); @@ -183,6 +199,7 @@ public class TopicNameTest { } catch (IllegalArgumentException e) { // Ok } + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName(" ")); TopicName nameWithSlash = TopicName.get("persistent://tenant/cluster/namespace/ns-abc/table/1"); assertEquals(nameWithSlash.getEncodedLocalName(), Codec.encode("ns-abc/table/1")); @@ -344,4 +361,15 @@ public class TopicNameTest { assertNotEquals(tp2.toString(), tp1.toString()); assertEquals(tp2.toString(), "persistent://tenant1/namespace1/tp1-partition-0-DLQ-partition-0"); } + + @Test + public void testToFullTopicName() { + // There is no constraint for local topic name + assertEquals("persistent://public/default/tp???xx=", TopicName.toFullTopicName("tp???xx=")); + assertEquals("persistent://tenant/ns/tp???xx=", TopicName.toFullTopicName("tenant/ns/tp???xx=")); + assertEquals("persistent://tenant/ns/test", TopicName.toFullTopicName("persistent://tenant/ns/test")); + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("ns/topic")); + // v1 format is not supported when the domain is not included + assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("tenant/cluster/ns/topic")); + } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index 75109883f98..c38e2ba08c1 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -221,7 +221,7 @@ public class LookupProxyHandler { **/ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata, long clientRequestId) { - TopicName topicName = TopicName.get(partitionMetadata.getTopic()); + String topicName = TopicName.toFullTopicName(partitionMetadata.getTopic()); String serviceUrl = getBrokerServiceUrl(clientRequestId); if (serviceUrl == null) { @@ -235,7 +235,7 @@ public class LookupProxyHandler { if (log.isDebugEnabled()) { log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr, - topicName.getPartitionedTopicName(), clientRequestId); + topicName, clientRequestId); } proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> { // Connected to backend broker @@ -245,7 +245,7 @@ public class LookupProxyHandler { partitionMetadata.isMetadataAutoCreationEnabled()); clientCnx.newLookup(command, requestId).whenComplete((r, t) -> { if (t != null) { - log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(), + log.warn("[{}] failed to get Partitioned metadata : {}", topicName, t.getMessage(), t); PulsarClientException pce = PulsarClientException.unwrap(t); writeAndFlush(Commands.newLookupErrorResponse(clientCnx.revertClientExToErrorCode(pce), diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java index 22957c9599f..3a98311eb15 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java @@ -101,7 +101,8 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter { private final BaseCommand cmd = new BaseCommand(); public void channelRead(ChannelHandlerContext ctx, Object msg) { - TopicName topicName; + String key; + String topicName; List<RawMessage> messages = new ArrayList<>(); ByteBuf buffer = (ByteBuf) (msg); @@ -130,8 +131,8 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter { logging(ctx.channel(), cmd.getType(), "", null); break; } - topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(cmd.getSend().getProducerId() + "," - + ctx.channel().id())); + topicName = TopicName.toFullTopicName(ParserProxyHandler.producerHashMap.get( + cmd.getSend().getProducerId() + "," + ctx.channel().id())); MutableLong msgBytes = new MutableLong(0); MessageParser.parseMessage(topicName, -1L, -1L, buffer, (message) -> { @@ -139,7 +140,7 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter { msgBytes.add(message.getData().readableBytes()); }, maxMessageSize); // update topic stats - TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(), + TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName, topic -> new TopicStats()); topicStats.getMsgInRate().recordMultipleEvents(messages.size(), msgBytes.longValue()); logging(ctx.channel(), cmd.getType(), "", messages); @@ -158,8 +159,9 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter { logging(ctx.channel(), cmd.getType(), "", null); break; } - topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId() - + "," + peerChannelId)); + topicName = TopicName.toFullTopicName(ParserProxyHandler.consumerHashMap.get( + cmd.getMessage().getConsumerId() + "," + peerChannelId)); + msgBytes = new MutableLong(0); MessageParser.parseMessage(topicName, -1L, -1L, buffer, (message) -> {
