codelipenghui commented on a change in pull request #13938:
URL: https://github.com/apache/pulsar/pull/13938#discussion_r791528701



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -643,17 +643,26 @@ protected void setSchemaCompatibilityStrategy(Policies 
policies) {
         if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) {
             schemaCompatibilityStrategy =
                     
brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy();
-        } else if (policies.schema_compatibility_strategy == 
SchemaCompatibilityStrategy.UNDEFINED) {
-            schemaCompatibilityStrategy = brokerService.pulsar()
-                    .getConfig().getSchemaCompatibilityStrategy();
-            if (schemaCompatibilityStrategy == 
SchemaCompatibilityStrategy.UNDEFINED) {
-                schemaCompatibilityStrategy = 
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
-                        policies.schema_auto_update_compatibility_strategy);
-            }
-        } else {
-            schemaCompatibilityStrategy = 
policies.schema_compatibility_strategy;
+            return;
+        }
+
+        schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
+        if 
(!SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+            return;
+        }
+
+        schemaCompatibilityStrategy = 
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+                policies.schema_auto_update_compatibility_strategy);
+        if 
(!SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+            return;
+        }
+
+        schemaCompatibilityStrategy = 
brokerService.pulsar().getConfig().getSchemaCompatibilityStrategy();
+        if 
(SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+            schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL;

Review comment:
       We can simply to change the default value of the broker configuration to 
FULL, without checking if the broker level is undefined.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -2342,15 +2342,19 @@ protected SchemaCompatibilityStrategy 
internalGetSchemaCompatibilityStrategy() {
         validateNamespacePolicyOperation(namespaceName, 
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
                 PolicyOperation.READ);
         Policies policies = getNamespacePolicies(namespaceName);
+
         SchemaCompatibilityStrategy schemaCompatibilityStrategy = 
policies.schema_compatibility_strategy;
-        if (schemaCompatibilityStrategy == 
SchemaCompatibilityStrategy.UNDEFINED) {
-            schemaCompatibilityStrategy = 
pulsar().getConfig().getSchemaCompatibilityStrategy();
-            if (schemaCompatibilityStrategy == 
SchemaCompatibilityStrategy.UNDEFINED) {
-                schemaCompatibilityStrategy = SchemaCompatibilityStrategy
-                        
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
-            }
+        if 
(!SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+            return schemaCompatibilityStrategy;
         }
-        return schemaCompatibilityStrategy;
+
+        schemaCompatibilityStrategy =
+                
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
+        if 
(!SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+            return schemaCompatibilityStrategy;
+        }
+
+        return pulsar().getConfig().getSchemaCompatibilityStrategy();

Review comment:
       For getting the namespace policy, we should not return the broker-level 
configuration.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
##########
@@ -199,30 +188,23 @@ public void testCompatibility(PostSchemaPayload payload, 
boolean authoritative,
         validateDestinationAndAdminOperation(authoritative);
 
         String schemaId = getSchemaId();
-        Policies policies = getNamespacePolicies(namespaceName);
-
-        SchemaCompatibilityStrategy schemaCompatibilityStrategy;
-        if (policies.schema_compatibility_strategy == 
SchemaCompatibilityStrategy.UNDEFINED) {
-            schemaCompatibilityStrategy = SchemaCompatibilityStrategy
-                    
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
-        } else {
-            schemaCompatibilityStrategy = 
policies.schema_compatibility_strategy;
-        }
 
-        pulsar().getSchemaRegistryService()
-                .isCompatible(schemaId,
-                        
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
-                                
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
-                                .user(defaultIfEmpty(clientAppId(), 
"")).props(payload.getProperties()).build(),
-                        schemaCompatibilityStrategy)
-                .thenAccept(isCompatible -> response.resume(Response.accepted()
-                        
.entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible)
-                                
.schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
-                        .build()))
-                .exceptionally(error -> {
-                    response.resume(new RestException(error));
-                    return null;
-                });
+        
getSchemaCompatibilityStrategyAsync().thenAccept(schemaCompatibilityStrategy -> 
{

Review comment:
       Miss exception handling for this block

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceSchemaCompatibilityStrategyTest.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.admin;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import 
org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiNamespaceSchemaCompatibilityStrategyTest extends 
MockedPulsarServiceBaseTest {

Review comment:
       And please also add a test for covering the broker level configuration

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
##########
@@ -55,7 +55,7 @@
     private static final String namespace = "test-namespace";
     private static final String namespaceName = PUBLIC_TENANT + "/" + 
namespace;
 
-    @BeforeClass
+    @BeforeMethod

Review comment:
       Why need to change to BeforeMethod?

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceSchemaCompatibilityStrategyTest.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.admin;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import 
org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiNamespaceSchemaCompatibilityStrategyTest extends 
MockedPulsarServiceBaseTest {

Review comment:
       Can we just add the test to `AdminApiSchemaTest`, so that we can avoid 
have many classes for the schema admin API test




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to