This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 58eda8c2c66 Fix can not enable system topic if 
`AutoUpdateSchemaEnabled=false`. (#15754)
58eda8c2c66 is described below

commit 58eda8c2c66961f5a3849eff5fb69084b1704971
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue May 24 06:17:04 2022 -0700

    Fix can not enable system topic if `AutoUpdateSchemaEnabled=false`. (#15754)
---
 .../apache/pulsar/broker/service/AbstractTopic.java  |  9 ++++++++-
 .../apache/pulsar/broker/service/BrokerService.java  |  2 +-
 .../SystemTopicBasedTopicPoliciesServiceTest.java    | 20 ++++++++++++++++++++
 3 files changed, 29 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index ed8279ad524..9cc17b700b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -266,7 +266,7 @@ public abstract class AbstractTopic implements Topic {
         String base = TopicName.get(getName()).getPartitionedTopicName();
         String id = TopicName.get(base).getSchemaName();
         SchemaRegistryService schemaRegistryService = 
brokerService.pulsar().getSchemaRegistryService();
-        return isAllowAutoUpdateSchema ? schemaRegistryService
+        return allowAutoUpdateSchema() ? schemaRegistryService
                 .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy)
                 : 
schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList
 ->
                 
schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, 
schema)
@@ -604,4 +604,11 @@ public abstract class AbstractTopic implements Topic {
             enableProducerReadForPublishRateLimiting();
         }
     }
+
+    private boolean allowAutoUpdateSchema() {
+        if (brokerService.isSystemTopic(topic)) {
+            return true;
+        }
+        return isAllowAutoUpdateSchema;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d9c015ea9e4..47ecd6e7cbf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2463,7 +2463,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         log.debug("No autoSubscriptionCreateOverride policy found for {}", 
topicName);
         return null;
     }
-    private boolean isSystemTopic(String topic) {
+    public boolean isSystemTopic(String topic) {
         return SystemTopicClient.isSystemTopic(TopicName.get(topic));
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index c8068404e28..48f234edea3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -24,11 +24,13 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCach
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -228,4 +230,22 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
             }
         });
     }
+
+    @Test
+    public void testAutoCreateSchema() throws Exception {
+        String namespace = "system-topic/ns2";
+        String topic = namespace + "/test";
+        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().setIsAllowAutoUpdateSchema(namespace, false);
+        admin.topics().createNonPartitionedTopic(topic);
+        TopicName changeEventTopicName =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(
+                        TopicName.get(topic).getNamespaceObject(), 
EventType.TOPIC_POLICY);
+        Awaitility.await().untilAsserted(() -> {
+            SchemaInfo schemaInfo = admin
+                    .schemas()
+                    .getSchemaInfo(changeEventTopicName.toString());
+            Assert.assertNotNull(schemaInfo);
+        });
+    }
 }

Reply via email to