This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 58eda8c2c66 Fix can not enable system topic if
`AutoUpdateSchemaEnabled=false`. (#15754)
58eda8c2c66 is described below
commit 58eda8c2c66961f5a3849eff5fb69084b1704971
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue May 24 06:17:04 2022 -0700
Fix can not enable system topic if `AutoUpdateSchemaEnabled=false`. (#15754)
---
.../apache/pulsar/broker/service/AbstractTopic.java | 9 ++++++++-
.../apache/pulsar/broker/service/BrokerService.java | 2 +-
.../SystemTopicBasedTopicPoliciesServiceTest.java | 20 ++++++++++++++++++++
3 files changed, 29 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index ed8279ad524..9cc17b700b2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -266,7 +266,7 @@ public abstract class AbstractTopic implements Topic {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService =
brokerService.pulsar().getSchemaRegistryService();
- return isAllowAutoUpdateSchema ? schemaRegistryService
+ return allowAutoUpdateSchema() ? schemaRegistryService
.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy)
:
schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList
->
schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList,
schema)
@@ -604,4 +604,11 @@ public abstract class AbstractTopic implements Topic {
enableProducerReadForPublishRateLimiting();
}
}
+
+ private boolean allowAutoUpdateSchema() {
+ if (brokerService.isSystemTopic(topic)) {
+ return true;
+ }
+ return isAllowAutoUpdateSchema;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d9c015ea9e4..47ecd6e7cbf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2463,7 +2463,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
log.debug("No autoSubscriptionCreateOverride policy found for {}",
topicName);
return null;
}
- private boolean isSystemTopic(String topic) {
+ public boolean isSystemTopic(String topic) {
return SystemTopicClient.isSystemTopic(TopicName.get(topic));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index c8068404e28..48f234edea3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -24,11 +24,13 @@ import
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCach
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -228,4 +230,22 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
}
});
}
+
+ @Test
+ public void testAutoCreateSchema() throws Exception {
+ String namespace = "system-topic/ns2";
+ String topic = namespace + "/test";
+ admin.namespaces().createNamespace(namespace);
+ admin.namespaces().setIsAllowAutoUpdateSchema(namespace, false);
+ admin.topics().createNonPartitionedTopic(topic);
+ TopicName changeEventTopicName =
+ NamespaceEventsSystemTopicFactory.getSystemTopicName(
+ TopicName.get(topic).getNamespaceObject(),
EventType.TOPIC_POLICY);
+ Awaitility.await().untilAsserted(() -> {
+ SchemaInfo schemaInfo = admin
+ .schemas()
+ .getSchemaInfo(changeEventTopicName.toString());
+ Assert.assertNotNull(schemaInfo);
+ });
+ }
}