This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 101aee4543f [fix][schema] Error checking schema compatibility on a 
schema-less topic via REST API (#22720)
101aee4543f is described below

commit 101aee4543fb66035165d8744def630f9a9c3a59
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Thu May 16 17:36:57 2024 +0800

    [fix][schema] Error checking schema compatibility on a schema-less topic 
via REST API (#22720)
---
 .../schema/AvroSchemaBasedCompatibilityCheck.java  |  6 +++--
 .../ProtobufNativeSchemaCompatibilityCheck.java    |  4 ++-
 .../service/schema/SchemaRegistryServiceImpl.java  |  2 +-
 .../exceptions/IncompatibleSchemaException.java    |  4 +++
 .../pulsar/broker/admin/AdminApiSchemaTest.java    | 30 ++++++++++++++++++++++
 .../broker/service/schema/SchemaServiceTest.java   |  4 +--
 6 files changed, 43 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
index 1e75834a129..e5fc7800c51 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java
@@ -64,8 +64,10 @@ abstract class AvroSchemaBasedCompatibilityCheck implements 
SchemaCompatibilityC
             log.warn("Error during schema parsing: {}", e.getMessage());
             throw new IncompatibleSchemaException(e);
         } catch (SchemaValidationException e) {
-            log.warn("Error during schema compatibility check: {}", 
e.getMessage());
-            throw new IncompatibleSchemaException(e);
+            String msg = String.format("Error during schema compatibility 
check with strategy %s: %s: %s",
+                    strategy, e.getClass().getName(), e.getMessage());
+            log.warn(msg);
+            throw new IncompatibleSchemaException(msg, e);
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java
index 16b3b33ec78..fc935e80dca 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java
@@ -67,7 +67,9 @@ public class ProtobufNativeSchemaCompatibilityCheck 
implements SchemaCompatibili
     private void checkRootMessageChange(Descriptor fromDescriptor, Descriptor 
toDescriptor,
                                             SchemaCompatibilityStrategy 
strategy) throws IncompatibleSchemaException {
         if (!fromDescriptor.getFullName().equals(toDescriptor.getFullName())) {
-            throw new IncompatibleSchemaException("Protobuf root message isn't 
allow change!");
+            throw new IncompatibleSchemaException("Protobuf root message 
change is not allowed under the '"
+                    + strategy + "' strategy. Original message name: '" + 
fromDescriptor.getFullName()
+                    + "', new message name: '" + toDescriptor.getFullName() + 
"'.");
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index ae56df248d8..903f57cb780 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -473,7 +473,7 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
                 }
                 return result;
             } else {
-                return FutureUtils.exception(new 
IncompatibleSchemaException("Do not have existing schema."));
+                return CompletableFuture.completedFuture(null);
             }
         });
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java
index c1a2d9fd703..bbe2f4111d7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java
@@ -33,6 +33,10 @@ public class IncompatibleSchemaException extends 
SchemaException {
         super(message);
     }
 
+    public IncompatibleSchemaException(String message, Throwable e) {
+        super(message, e);
+    }
+
     public IncompatibleSchemaException(Throwable e) {
         super(e);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index f67bd6fcfce..34d7dbeb818 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -467,4 +467,34 @@ public class AdminApiSchemaTest extends 
MockedPulsarServiceBaseTest {
             assertTrue(e.getMessage().contains("Incompatible schema: exists 
schema type STRING, new schema type INT8"));
         }
     }
+
+    @Test
+    public void testCompatibilityWithEmpty() throws Exception {
+        List<Schema<?>> checkSchemas = List.of(
+                Schema.STRING,
+                
Schema.JSON(SchemaDefinition.builder().withPojo(Foo.class).withProperties(PROPS).build()),
+                
Schema.AVRO(SchemaDefinition.builder().withPojo(Foo.class).withProperties(PROPS).build()),
+                Schema.KeyValue(Schema.STRING, Schema.STRING)
+        );
+        for (Schema<?> schema : checkSchemas) {
+            SchemaInfo schemaInfo = schema.getSchemaInfo();
+            String topicName = schemaCompatibilityNamespace + 
"/testCompatibilityWithEmpty";
+            PostSchemaPayload postSchemaPayload = new 
PostSchemaPayload(schemaInfo.getType().toString(),
+                    schemaInfo.getSchemaDefinition(), new HashMap<>());
+
+            // check compatibility with empty schema
+            IsCompatibilityResponse isCompatibilityResponse =
+                    admin.schemas().testCompatibility(topicName, 
postSchemaPayload);
+            assertTrue(isCompatibilityResponse.isCompatibility());
+            
assertEquals(isCompatibilityResponse.getSchemaCompatibilityStrategy(), 
SchemaCompatibilityStrategy.FULL.name());
+
+            // set schema compatibility strategy is FULL_TRANSITIVE to cover 
checkCompatibilityWithAll
+            
admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace, 
SchemaCompatibilityStrategy.FULL_TRANSITIVE);
+            isCompatibilityResponse = 
admin.schemas().testCompatibility(topicName, postSchemaPayload);
+            assertTrue(isCompatibilityResponse.isCompatibility());
+            
assertEquals(isCompatibilityResponse.getSchemaCompatibilityStrategy(), 
SchemaCompatibilityStrategy.FULL_TRANSITIVE.name());
+            // set back to FULL
+            
admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace, 
SchemaCompatibilityStrategy.FULL);
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 3a4016eb79c..fbf8c5cc154 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.service.schema;
 
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
-import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertFalse;
@@ -48,7 +47,6 @@ import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import 
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -407,7 +405,7 @@ public class SchemaServiceTest extends 
MockedPulsarServiceBaseTest {
                         .build(),
                 SchemaInfo.builder().type(SchemaType.BOOLEAN).schema(new 
byte[0])
                         .build(), KeyValueEncodingType.SEPARATED);
-        assertThrows(PulsarAdminException.ServerSideErrorException.class, () 
-> admin.schemas().testCompatibility(topicName, schemaInfo));
+        Assert.assertTrue(admin.schemas().testCompatibility(topicName, 
schemaInfo).isCompatibility());
         admin.schemas().createSchema(topicName, schemaInfo);
 
         final IsCompatibilityResponse isCompatibilityResponse = 
admin.schemas().testCompatibility(topicName, schemaInfo);

Reply via email to