This is an automated email from the ASF dual-hosted git repository.
shibd pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 75494f05f79 [fix][broker][branch-4.2] URL-encode sub-name in Txn
pending-ack topic #25727 (#25728)
75494f05f79 is described below
commit 75494f05f79156f414552887a8a89a9193244bb7
Author: Baodi Shi <[email protected]>
AuthorDate: Sat May 9 16:11:21 2026 +0800
[fix][broker][branch-4.2] URL-encode sub-name in Txn pending-ack topic
#25727 (#25728)
---
.../broker/service/persistent/PersistentTopic.java | 3 +--
.../pendingack/impl/MLPendingAckStore.java | 11 ++++++--
.../pendingack/impl/MLPendingAckStoreTest.java | 30 ++++++++++++++++++++++
3 files changed, 40 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 03adb2a9c5e..e7a006185af 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1277,8 +1277,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
TopicName tn = TopicName.get(MLPendingAckStore
- .getTransactionPendingAckStoreSuffix(topic,
- Codec.encode(subscriptionName)));
+ .getTransactionPendingAckStoreSuffix(topic, subscriptionName));
if
(brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
ManagedLedgerConfig managedLedgerConfig = ledger.getConfig();
ManagedLedgerFactory managedLedgerFactory = getBrokerService()
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index 25c7727259d..b4f63bf64c1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -59,6 +59,7 @@ import
org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.transaction.coordinator.impl.TxnBatchedPositionImpl;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter;
import
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
@@ -516,7 +517,13 @@ public class MLPendingAckStore implements PendingAckStore {
}
public static String getTransactionPendingAckStoreSuffix(String
originTopicName, String subName) {
- return TopicName.get(originTopicName) + "-" + subName +
SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
+ // URL-encode the subscription name so that any '/' characters it
contains do not create
+ // extra path segments when the resulting string is parsed as a topic
name. TopicName
+ // always decodes the local-name component on parse (via Codec.decode)
and re-encodes it
+ // on output (via getEncodedLocalName / getPersistenceNamingEncoding),
so encoding here
+ // produces a valid round-trip with no double-encoding.
+ String encodedSubName = Codec.encode(subName);
+ return TopicName.get(originTopicName) + "-" + encodedSubName +
SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
}
public static String getTransactionPendingAckStoreCursorName() {
@@ -569,4 +576,4 @@ public class MLPendingAckStore implements PendingAckStore {
return buf;
}
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java
index 154cbe39d40..14e1ceade50 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java
@@ -45,6 +45,9 @@ import
org.apache.pulsar.broker.transaction.util.LogIndexLagBackoff;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.awaitility.Awaitility;
@@ -140,6 +143,33 @@ public class MLPendingAckStoreTest extends
TransactionTestBase {
return (MLPendingAckStore)
mlPendingAckStoreProvider.newPendingAckStore(persistentSubscriptionMock).get();
}
+ @Test
+ public void testPendingAckStoreWithSlashSubscriptionName() throws
Exception {
+ String slashSubName = "tenant/namespace/my-function";
+ when(persistentSubscriptionMock.getName()).thenReturn(slashSubName);
+
+ MLPendingAckStoreProvider provider = new MLPendingAckStoreProvider();
+
+ // Should not throw — subscription names containing '/' must be
URL-encoded so the
+ // resulting pending-ack topic name is a valid V2 persistent topic
name.
+ MLPendingAckStore store = (MLPendingAckStore)
provider.newPendingAckStore(persistentSubscriptionMock).get();
+
+ // Verify the managed ledger persistence path encodes the subscription
name correctly.
+ // Expected: tenant/namespace/persistent/<encodedLocalName>
+ // where localName =
"<topic>-<encodedSubName>__transaction_pending_ack"
+ String originTopicName =
persistentSubscriptionMock.getTopic().getName();
+ TopicName origin = TopicName.get(originTopicName);
+ String encodedSubName = Codec.encode(slashSubName);
+ String expectedLocalName = origin.getLocalName() + "-" + encodedSubName
+ + SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
+ // getPersistenceNamingEncoding() =
tenant/namespace/persistent/encodedLocalName
+ String expectedMlName = origin.getTenant() + "/" +
origin.getNamespacePortion()
+ + "/persistent/" + Codec.encode(expectedLocalName);
+ Assert.assertEquals(store.getManagedLedger().get().getName(),
expectedMlName);
+
+ closePendingAckStoreWithRetry(store);
+ }
+
/**
* Overridden cases:
* 1. Batched write and replay with batched feature.