This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0d2022dea5db7fa9057a769dcaacfd904c337b43
Author: feynmanlin <feynman...@tencent.com>
AuthorDate: Fri Nov 12 13:43:50 2021 +0800

    Even if always compatible is set, Consumers cannot be created (#12721)
    
    (cherry picked from commit c3da1452a444c9599cb85562a3faa82ddfdecec8)
---
 .../service/schema/SchemaRegistryServiceImpl.java  |  3 +++
 .../SchemaCompatibilityCheckTest.java              | 22 ++++++++++++++++++++++
 2 files changed, 25 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 0eff36b..e1c9b13 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -270,6 +270,9 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
     @Override
     public CompletableFuture<Void> checkConsumerCompatibility(String schemaId, 
SchemaData schemaData,
                                                               
SchemaCompatibilityStrategy strategy) {
+        if (SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE == strategy) {
+            return CompletableFuture.completedFuture(null);
+        }
         return getSchema(schemaId).thenCompose(existingSchema -> {
             if (existingSchema != null && !existingSchema.schema.isDeleted()) {
                 if (strategy == SchemaCompatibilityStrategy.BACKWARD
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 02913c6..293f71d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -395,6 +395,28 @@ public class SchemaCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
 
     }
 
+    @Test
+    public void testAutoProduceSchemaAlwaysCompatible() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String topic = "topic" + randomName(16);
+
+        String namespace = "test-namespace-" + randomName(16);
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(), tenant, namespace, 
topic).toString();
+        NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
+        admin.namespaces().createNamespace(tenant + "/" + namespace, 
Sets.newHashSet(CLUSTER_NAME));
+
+        // set ALWAYS_COMPATIBLE
+        
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+        Producer producer = 
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
+        // should not fail
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).subscriptionName("my-sub").topic(topicName).subscribe();
+
+        producer.close();
+        consumer.close();
+    }
+
     @Test(dataProvider =  "CanReadLastSchemaCompatibilityStrategy")
     public void 
testConsumerWithNotCompatibilitySchema(SchemaCompatibilityStrategy 
schemaCompatibilityStrategy) throws Exception {
         final String tenant = PUBLIC_TENANT;

Reply via email to