This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new de57f98  Allow to configure schema compatibility policy for system 
topics (#12598)
de57f98 is described below

commit de57f98692c64ae5a1c6d5f79f4fbb922f946cb5
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Nov 3 22:10:39 2021 -0700

    Allow to configure schema compatibility policy for system topics (#12598)
    
    (cherry picked from commit 7aea58d293ba2ca29e0acbf4cfd5733d84846120)
---
 conf/broker.conf                                   |  3 +++
 .../apache/pulsar/broker/ServiceConfiguration.java |  8 +++++++
 .../pulsar/broker/service/AbstractTopic.java       | 11 ++++++++--
 .../NamespaceEventsSystemTopicFactory.java         |  7 ++++++
 .../NamespaceEventsSystemTopicServiceTest.java     | 25 ++++++++++++++++++++++
 5 files changed, 52 insertions(+), 2 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 0cd4e1e..9534ed7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -455,6 +455,9 @@ zookeeperSessionExpiredPolicy=shutdown
 # Enable or disable system topic
 systemTopicEnabled=false
 
+# The schema compatibility strategy to use for system topics
+systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE
+
 # Enable or disable topic level policies, topic level policies depends on the 
system topic
 # Please enable the system topic first.
 topicLevelPoliciesEnabled=false
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 38adecf..70f9cbe 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.sasl.SaslConstants;
@@ -860,6 +861,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     private boolean systemTopicEnabled = false;
 
     @FieldContext(
+            category = CATEGORY_SCHEMA,
+            doc = "The schema compatibility strategy to use for system topics"
+    )
+    private SchemaCompatibilityStrategy systemTopicSchemaCompatibilityStrategy 
=
+            SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE;
+
+    @FieldContext(
         category = CATEGORY_SERVER,
         doc = "Enable or disable topic level policies, topic level policies 
depends on the system topic, " +
                 "please enable the system topic first.")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 339e89d..d568ac2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import lombok.Getter;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -39,6 +40,7 @@ import 
org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
@@ -84,6 +86,8 @@ public abstract class AbstractTopic implements Topic {
 
     // Whether messages published must be encrypted or not in this topic
     protected volatile boolean isEncryptionRequired = false;
+
+    @Getter
     protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy 
=
             SchemaCompatibilityStrategy.FULL;
     protected volatile boolean isAllowAutoUpdateSchema = true;
@@ -313,8 +317,11 @@ public abstract class AbstractTopic implements Topic {
         PUBLISH_LATENCY.observe(latency, unit);
     }
 
-    protected void setSchemaCompatibilityStrategy (Policies policies) {
-        if (policies.schema_compatibility_strategy == 
SchemaCompatibilityStrategy.UNDEFINED) {
+    protected void setSchemaCompatibilityStrategy(Policies policies) {
+        if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) {
+            schemaCompatibilityStrategy =
+                    
brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy();
+        } else if (policies.schema_compatibility_strategy == 
SchemaCompatibilityStrategy.UNDEFINED) {
             schemaCompatibilityStrategy = 
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
                     policies.schema_auto_update_compatibility_strategy);
         } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
index 911a997..266c30e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
@@ -34,6 +34,13 @@ public class NamespaceEventsSystemTopicFactory {
         this.client = client;
     }
 
+    public TopicPoliciesSystemTopicClient 
createTopicPoliciesSystemTopicClient(NamespaceName namespaceName) {
+        TopicName topicName = TopicName.get("persistent", namespaceName,
+                EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        log.info("Create topic policies system topic client {}", 
topicName.toString());
+        return new TopicPoliciesSystemTopicClient(client, topicName);
+    }
+
     public SystemTopicClient createSystemTopic(NamespaceName namespaceName, 
EventType eventType) {
         TopicName topicName = getSystemTopicName(namespaceName, eventType);
         if (topicName != null) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index 1363e1d..1995987 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -19,9 +19,14 @@
 package org.apache.pulsar.broker.systopic;
 
 import com.google.common.collect.Sets;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.events.ActionType;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.events.EventsTopicNames;
@@ -31,6 +36,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +69,25 @@ public class NamespaceEventsSystemTopicServiceTest extends 
MockedPulsarServiceBa
     }
 
     @Test
+    public void testSchemaCompatibility() throws Exception {
+        TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = 
systemTopicFactory
+                
.createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1));
+        String topicName = 
systemTopicClientForNamespace1.getTopicName().toString();
+        @Cleanup
+        Reader<byte[]> reader = pulsarClient.newReader(Schema.BYTES)
+                .topic(topicName)
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        PersistentTopic topic =
+                (PersistentTopic) pulsar.getBrokerService()
+                        .getTopic(topicName, false)
+                        .join().get();
+
+        Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, 
topic.getSchemaCompatibilityStrategy());
+    }
+
+    @Test
     public void testSendAndReceiveNamespaceEvents() throws Exception {
         SystemTopicClient systemTopicClientForNamespace1 = 
systemTopicFactory.createSystemTopic(NamespaceName.get(NAMESPACE1), 
EventType.TOPIC_POLICY);
         TopicPolicies policies = TopicPolicies.builder()

Reply via email to