This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push: new de57f98 Allow to configure schema compatibility policy for system topics (#12598) de57f98 is described below commit de57f98692c64ae5a1c6d5f79f4fbb922f946cb5 Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Nov 3 22:10:39 2021 -0700 Allow to configure schema compatibility policy for system topics (#12598) (cherry picked from commit 7aea58d293ba2ca29e0acbf4cfd5733d84846120) --- conf/broker.conf | 3 +++ .../apache/pulsar/broker/ServiceConfiguration.java | 8 +++++++ .../pulsar/broker/service/AbstractTopic.java | 11 ++++++++-- .../NamespaceEventsSystemTopicFactory.java | 7 ++++++ .../NamespaceEventsSystemTopicServiceTest.java | 25 ++++++++++++++++++++++ 5 files changed, 52 insertions(+), 2 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 0cd4e1e..9534ed7 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -455,6 +455,9 @@ zookeeperSessionExpiredPolicy=shutdown # Enable or disable system topic systemTopicEnabled=false +# The schema compatibility strategy to use for system topics +systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE + # Enable or disable topic level policies, topic level policies depends on the system topic # Please enable the system topic first. topicLevelPoliciesEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 38adecf..70f9cbe 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -41,6 +41,7 @@ import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.sasl.SaslConstants; @@ -860,6 +861,13 @@ public class ServiceConfiguration implements PulsarConfiguration { private boolean systemTopicEnabled = false; @FieldContext( + category = CATEGORY_SCHEMA, + doc = "The schema compatibility strategy to use for system topics" + ) + private SchemaCompatibilityStrategy systemTopicSchemaCompatibilityStrategy = + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE; + + @FieldContext( category = CATEGORY_SERVER, doc = "Enable or disable topic level policies, topic level policies depends on the system topic, " + "please enable the system topic first.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 339e89d..d568ac2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import lombok.Getter; import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; @@ -39,6 +40,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; @@ -84,6 +86,8 @@ public abstract class AbstractTopic implements Topic { // Whether messages published must be encrypted or not in this topic protected volatile boolean isEncryptionRequired = false; + + @Getter protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL; protected volatile boolean isAllowAutoUpdateSchema = true; @@ -313,8 +317,11 @@ public abstract class AbstractTopic implements Topic { PUBLISH_LATENCY.observe(latency, unit); } - protected void setSchemaCompatibilityStrategy (Policies policies) { - if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { + protected void setSchemaCompatibilityStrategy(Policies policies) { + if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) { + schemaCompatibilityStrategy = + brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy(); + } else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( policies.schema_auto_update_compatibility_strategy); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java index 911a997..266c30e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java @@ -34,6 +34,13 @@ public class NamespaceEventsSystemTopicFactory { this.client = client; } + public TopicPoliciesSystemTopicClient createTopicPoliciesSystemTopicClient(NamespaceName namespaceName) { + TopicName topicName = TopicName.get("persistent", namespaceName, + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + log.info("Create topic policies system topic client {}", topicName.toString()); + return new TopicPoliciesSystemTopicClient(client, topicName); + } + public SystemTopicClient createSystemTopic(NamespaceName namespaceName, EventType eventType) { TopicName topicName = getSystemTopicName(namespaceName, eventType); if (topicName != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java index 1363e1d..1995987 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java @@ -19,9 +19,14 @@ package org.apache.pulsar.broker.systopic; import com.google.common.collect.Sets; +import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.events.ActionType; import org.apache.pulsar.common.events.EventType; import org.apache.pulsar.common.events.EventsTopicNames; @@ -31,6 +36,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +69,25 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa } @Test + public void testSchemaCompatibility() throws Exception { + TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory + .createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1)); + String topicName = systemTopicClientForNamespace1.getTopicName().toString(); + @Cleanup + Reader<byte[]> reader = pulsarClient.newReader(Schema.BYTES) + .topic(topicName) + .startMessageId(MessageId.earliest) + .create(); + + PersistentTopic topic = + (PersistentTopic) pulsar.getBrokerService() + .getTopic(topicName, false) + .join().get(); + + Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy()); + } + + @Test public void testSendAndReceiveNamespaceEvents() throws Exception { SystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory.createSystemTopic(NamespaceName.get(NAMESPACE1), EventType.TOPIC_POLICY); TopicPolicies policies = TopicPolicies.builder()