This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 5f38eaff282 [fix][broker] partitioned __change_events topic is policy
topic (#20392)
5f38eaff282 is described below
commit 5f38eaff282c890d19902e02adc512e8bfb2a788
Author: Michael Marshall <[email protected]>
AuthorDate: Thu May 25 03:46:31 2023 -0500
[fix][broker] partitioned __change_events topic is policy topic (#20392)
(cherry picked from commit 9918bced4465e0b0746a7959550c90cb76ae945f)
---
.../broker/service/persistent/PersistentTopic.java | 2 +-
.../pulsar/common/naming/SystemTopicNames.java | 6 +--
.../pulsar/common/naming/SystemTopicNamesTest.java | 47 ++++++++++++++++++++++
3 files changed, 51 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 431be4d0811..22cd0b7d123 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -712,7 +712,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
try {
- if
(!topic.endsWith(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)
+ if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
&& !checkSubscriptionTypesEnable(subType)) {
return FutureUtil.failedFuture(
new NotAllowedException("Topic[{" + topic + "}]
doesn't support "
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
index eaab8261460..dd78ed804f6 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
@@ -71,7 +71,7 @@ public class SystemTopicNames {
if (topic == null) {
return false;
}
- return
TopicName.get(topic).getLocalName().equals(NAMESPACE_EVENTS_LOCAL_NAME);
+ return
TopicName.getPartitionedTopicName(topic).getLocalName().equals(NAMESPACE_EVENTS_LOCAL_NAME);
}
public static boolean isTransactionInternalName(TopicName topicName) {
@@ -82,7 +82,7 @@ public class SystemTopicNames {
}
public static boolean isSystemTopic(TopicName topicName) {
- TopicName nonePartitionedTopicName =
TopicName.get(topicName.getPartitionedTopicName());
- return isEventSystemTopic(nonePartitionedTopicName) ||
isTransactionInternalName(nonePartitionedTopicName);
+ TopicName nonPartitionedTopicName =
TopicName.get(topicName.getPartitionedTopicName());
+ return isEventSystemTopic(nonPartitionedTopicName) ||
isTransactionInternalName(nonPartitionedTopicName);
}
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java
new file mode 100644
index 00000000000..f919ab02b54
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import static org.testng.AssertJUnit.assertEquals;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test
+public class SystemTopicNamesTest {
+
+ @DataProvider(name = "topicPoliciesSystemTopicNames")
+ public static Object[][] topicPoliciesSystemTopicNames() {
+ return new Object[][] {
+ {"persistent://public/default/__change_events", true},
+ {"persistent://public/default/__change_events-partition-0",
true},
+ {"persistent://random-tenant/random-ns/__change_events", true},
+
{"persistent://random-tenant/random-ns/__change_events-partition-1", true},
+ {"persistent://public/default/not_really__change_events",
false},
+ {"persistent://public/default/__change_events-diff-suffix",
false},
+ {"persistent://a/b/not_really__change_events", false},
+ };
+ }
+
+ @Test(dataProvider = "topicPoliciesSystemTopicNames")
+ public void testIsTopicPoliciesSystemTopic(String topicName, boolean
expectedResult) {
+ assertEquals(expectedResult,
SystemTopicNames.isTopicPoliciesSystemTopic(topicName));
+ assertEquals(expectedResult,
SystemTopicNames.isSystemTopic(TopicName.get(topicName)));
+ assertEquals(expectedResult,
SystemTopicNames.isEventSystemTopic(TopicName.get(topicName)));
+ }
+}