This is an automated email from the ASF dual-hosted git repository.

shibd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e2df4ec293 [fix][broker] URL-encode sub-name in Txn pending-ack topic 
 (#25727)
4e2df4ec293 is described below

commit 4e2df4ec293009ce281fe22a83676991b0d16f8f
Author: Baodi Shi <[email protected]>
AuthorDate: Sat May 9 14:27:17 2026 +0800

    [fix][broker] URL-encode sub-name in Txn pending-ack topic  (#25727)
---
 .../pulsar/broker/admin/v2/ScalableTopics.java     |  1 +
 .../broker/service/persistent/PersistentTopic.java |  3 +--
 .../pendingack/impl/MLPendingAckStore.java         | 11 ++++++--
 .../pendingack/impl/MLPendingAckStoreTest.java     | 30 ++++++++++++++++++++++
 4 files changed, 41 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index 6b73fdf00d6..81a55da7f70 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.scalable.SegmentInfo;
 import org.apache.pulsar.common.scalable.SegmentTopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bf795e85c3d..6f0a8cea54a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1331,8 +1331,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
 
         TopicName tn = TopicName.get(MLPendingAckStore
-                .getTransactionPendingAckStoreSuffix(topic,
-                        Codec.encode(subscriptionName)));
+                .getTransactionPendingAckStoreSuffix(topic, subscriptionName));
         if 
(brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
             ManagedLedgerConfig managedLedgerConfig = ledger.getConfig();
                 ManagedLedgerFactory managedLedgerFactory = getBrokerService()
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index e10a93cbc1a..c3524e1c2ea 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -60,6 +60,7 @@ import 
org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.transaction.coordinator.impl.TxnBatchedPositionImpl;
 import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter;
 import 
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
@@ -524,6 +525,12 @@ public class MLPendingAckStore implements PendingAckStore {
 
     public static String getTransactionPendingAckStoreSuffix(String 
originTopicName, String subName) {
         TopicName origin = TopicName.get(originTopicName);
+        // URL-encode the subscription name so that any '/' characters it 
contains do not create
+        // extra path segments when the resulting string is parsed as a topic 
name.  TopicName
+        // always decodes the local-name component on parse (via Codec.decode) 
and re-encodes it
+        // on output (via getEncodedLocalName / getPersistenceNamingEncoding), 
so encoding here
+        // produces a valid round-trip with no double-encoding.
+        String encodedSubName = Codec.encode(subName);
         // Segment topics 
("segment://tenant/ns/topic/<hexStart>-<hexEnd>-<segmentId>") cannot
         // host a derived pending-ack topic in the segment domain — the 
descriptor parser would
         // reject any name with extra dashes appended. Map to a flat 
persistent topic in the same
@@ -532,9 +539,9 @@ public class MLPendingAckStore implements PendingAckStore {
             return String.format("persistent://%s/%s/%s-%s-%s%s",
                     origin.getTenant(), origin.getNamespacePortion(),
                     origin.getLocalName(), origin.getSegmentDescriptor(),
-                    subName, SystemTopicNames.PENDING_ACK_STORE_SUFFIX);
+                    encodedSubName, SystemTopicNames.PENDING_ACK_STORE_SUFFIX);
         }
-        return origin + "-" + subName + 
SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
+        return origin + "-" + encodedSubName + 
SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
     }
 
     public static String getTransactionPendingAckStoreCursorName() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java
index b41a9feaefc..b20e44d6dc0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java
@@ -45,6 +45,9 @@ import 
org.apache.pulsar.broker.transaction.util.LogIndexLagBackoff;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import 
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
 import org.awaitility.Awaitility;
@@ -140,6 +143,33 @@ public class MLPendingAckStoreTest extends 
TransactionTestBase {
         return (MLPendingAckStore) 
mlPendingAckStoreProvider.newPendingAckStore(persistentSubscriptionMock).get();
     }
 
+    @Test
+    public void testPendingAckStoreWithSlashSubscriptionName() throws 
Exception {
+        String slashSubName = "tenant/namespace/my-function";
+        when(persistentSubscriptionMock.getName()).thenReturn(slashSubName);
+
+        MLPendingAckStoreProvider provider = new MLPendingAckStoreProvider();
+
+        // Should not throw — subscription names containing '/' must be 
URL-encoded so the
+        // resulting pending-ack topic name is a valid V2 persistent topic 
name.
+        MLPendingAckStore store = (MLPendingAckStore) 
provider.newPendingAckStore(persistentSubscriptionMock).get();
+
+        // Verify the managed ledger persistence path encodes the subscription 
name correctly.
+        // Expected: tenant/namespace/persistent/<encodedLocalName>
+        // where localName = 
"<topic>-<encodedSubName>__transaction_pending_ack"
+        String originTopicName = 
persistentSubscriptionMock.getTopic().getName();
+        TopicName origin = TopicName.get(originTopicName);
+        String encodedSubName = Codec.encode(slashSubName);
+        String expectedLocalName = origin.getLocalName() + "-" + encodedSubName
+                + SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
+        // getPersistenceNamingEncoding() = 
tenant/namespace/persistent/encodedLocalName
+        String expectedMlName = origin.getTenant() + "/" + 
origin.getNamespacePortion()
+                + "/persistent/" + Codec.encode(expectedLocalName);
+        Assert.assertEquals(store.getManagedLedger().get().getName(), 
expectedMlName);
+
+        closePendingAckStoreWithRetry(store);
+    }
+
     /**
      * Overridden cases:
      *   1. Batched write and replay with batched feature.

Reply via email to