This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 8b62272  [Issue 12757][broker] add broker config 
isAllowAutoUpdateSchema (#12786)
8b62272 is described below

commit 8b622722dcb2c9d99e40b9e9d5b713a845677f42
Author: JiangHaiting <jianghait...@apache.org>
AuthorDate: Thu Nov 18 20:37:37 2021 +0800

    [Issue 12757][broker] add broker config isAllowAutoUpdateSchema (#12786)
    
    (cherry picked from commit fa7be236efcc6772e0aac05f25f8d5f3cf0ad741)
---
 conf/broker.conf                                   |  4 ++
 conf/standalone.conf                               |  4 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  8 +++
 .../pulsar/broker/service/AbstractTopic.java       | 38 ++++++----
 .../SchemaCompatibilityCheckTest.java              | 81 ++++++++++++++++++++++
 .../pulsar/common/policies/data/Policies.java      |  2 +-
 6 files changed, 121 insertions(+), 16 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 552bdb4..a51a5f7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -264,6 +264,10 @@ brokerMaxConnections=0
 # The maximum number of connections per IP. If it exceeds, new connections are 
rejected.
 brokerMaxConnectionsPerIp=0
 
+# Allow schema to be auto updated at broker level. User can override this by
+# 'is_allow_auto_update_schema' of namespace policy.
+isAllowAutoUpdateSchemaEnabled=true
+
 # Enable check for minimum allowed client library version
 clientLibraryVersionCheckEnabled=false
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 7f52eca..d8c1370 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -169,6 +169,10 @@ defaultNumberOfNamespaceBundles=4
 # Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
 maxTopicsPerNamespace=0
 
+# Allow schema to be auto updated at broker level. User can override this by
+# 'is_allow_auto_update_schema' of namespace policy.
+isAllowAutoUpdateSchemaEnabled=true
+
 # Enable check for minimum allowed client library version
 clientLibraryVersionCheckEnabled=false
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5eb0a6b..21c0e5f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -554,6 +554,14 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     private int brokerMaxConnectionsPerIp = 0;
 
     @FieldContext(
+        category = CATEGORY_POLICIES,
+        dynamic = true,
+        doc = "Allow schema to be auto updated at broker level. User can 
override this by 'is_allow_auto_update_schema'"
+            + " of namespace policy. This is enabled by default."
+    )
+    private boolean isAllowAutoUpdateSchemaEnabled = true;
+
+    @FieldContext(
         category = CATEGORY_SERVER,
         dynamic = true,
         doc = "Enable check for minimum allowed client library version"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index ebd05b6..5d24147 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -98,7 +98,7 @@ public abstract class AbstractTopic implements Topic {
     @Getter
     protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy 
=
             SchemaCompatibilityStrategy.FULL;
-    protected volatile boolean isAllowAutoUpdateSchema = true;
+    protected volatile Boolean isAllowAutoUpdateSchema;
     // schema validation enforced flag
     protected volatile boolean schemaValidationEnforced = false;
 
@@ -330,20 +330,28 @@ public abstract class AbstractTopic implements Topic {
         String base = TopicName.get(getName()).getPartitionedTopicName();
         String id = TopicName.get(base).getSchemaName();
         SchemaRegistryService schemaRegistryService = 
brokerService.pulsar().getSchemaRegistryService();
-        return isAllowAutoUpdateSchema ? schemaRegistryService
-                .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy)
-                : 
schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList
 ->
-                
schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, 
schema)
-                        .thenCompose(schemaVersion -> {
-                    if (schemaVersion == null) {
-                        return FutureUtil
-                                .failedFuture(
-                                        new IncompatibleSchemaException(
-                                                "Schema not found and schema 
auto updating is disabled."));
-                    } else {
-                        return 
CompletableFuture.completedFuture(schemaVersion);
-                    }
-                }));
+
+        if (allowAutoUpdateSchema()) {
+            return schemaRegistryService.putSchemaIfAbsent(id, schema, 
schemaCompatibilityStrategy);
+        } else {
+            return 
schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList
 ->
+                    
schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, 
schema)
+                            .thenCompose(schemaVersion -> {
+                                if (schemaVersion == null) {
+                                    return FutureUtil.failedFuture(new 
IncompatibleSchemaException(
+                                            "Schema not found and schema auto 
updating is disabled."));
+                                } else {
+                                    return 
CompletableFuture.completedFuture(schemaVersion);
+                                }
+                            }));
+        }
+    }
+
+    private boolean allowAutoUpdateSchema() {
+        if (isAllowAutoUpdateSchema == null) {
+            return 
brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
+        }
+        return isAllowAutoUpdateSchema;
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 293f71d..80168b9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -218,6 +218,87 @@ public class SchemaCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
+    public void 
testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy 
schemaCompatibilityStrategy)
+            throws Exception {
+
+        final String tenant = PUBLIC_TENANT;
+        final String topic = "test-consumer-compatibility";
+        String namespace = "test-namespace-" + randomName(16);
+        String fqtn = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topic
+        ).toString();
+
+        NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
+
+        admin.namespaces().createNamespace(
+                tenant + "/" + namespace,
+                Sets.newHashSet(CLUSTER_NAME)
+        );
+
+        
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
+                SchemaCompatibilityStrategy.FULL);
+
+        
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), 
schemaCompatibilityStrategy);
+        admin.schemas().createSchema(fqtn, 
Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
+
+
+        pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
+        ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
+                
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+                                (false).withSupportSchemaVersioning(true).
+                        withPojo(Schemas.PersonTwo.class).build()))
+                .topic(fqtn);
+        try {
+            producerThreeBuilder.create();
+        } catch (Exception e) {
+            Assert.assertTrue(e.getMessage().contains("Schema not found and 
schema auto updating is disabled."));
+        }
+
+        pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(true);
+        ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder = 
pulsarClient.newConsumer(Schema.AVRO(
+                        
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+                                        
(false).withSupportSchemaVersioning(true).
+                                withPojo(Schemas.PersonTwo.class).build()))
+                .subscriptionName("test")
+                .topic(fqtn);
+
+        Producer<Schemas.PersonTwo> producer = producerThreeBuilder.create();
+        Consumer<Schemas.PersonTwo> consumerTwo = comsumerBuilder.subscribe();
+
+        producer.send(new Schemas.PersonTwo(2, "Lucy"));
+        Message<Schemas.PersonTwo> message = consumerTwo.receive();
+
+        Schemas.PersonTwo personTwo = message.getValue();
+        consumerTwo.acknowledge(message);
+
+        assertEquals(personTwo.getId(), 2);
+        assertEquals(personTwo.getName(), "Lucy");
+
+        producer.close();
+        consumerTwo.close();
+
+        pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
+
+        producer = producerThreeBuilder.create();
+        consumerTwo = comsumerBuilder.subscribe();
+
+        producer.send(new Schemas.PersonTwo(2, "Lucy"));
+        message = consumerTwo.receive();
+
+        personTwo = message.getValue();
+        consumerTwo.acknowledge(message);
+
+        assertEquals(personTwo.getId(), 2);
+        assertEquals(personTwo.getName(), "Lucy");
+
+        consumerTwo.close();
+        producer.close();
+    }
+
     @Test(dataProvider =  "AllCheckSchemaCompatibilityStrategy")
     public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy 
schemaCompatibilityStrategy) throws Exception {
         final String tenant = PUBLIC_TENANT;
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index ff773f6..8622715 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -107,7 +107,7 @@ public class Policies {
     public SchemaCompatibilityStrategy schema_compatibility_strategy = 
SchemaCompatibilityStrategy.UNDEFINED;
 
     @SuppressWarnings("checkstyle:MemberName")
-    public boolean is_allow_auto_update_schema = true;
+    public Boolean is_allow_auto_update_schema = null;
 
     @SuppressWarnings("checkstyle:MemberName")
     public boolean schema_validation_enforced = false;

Reply via email to