This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0d2022dea5db7fa9057a769dcaacfd904c337b43 Author: feynmanlin <feynman...@tencent.com> AuthorDate: Fri Nov 12 13:43:50 2021 +0800 Even if always compatible is set, Consumers cannot be created (#12721) (cherry picked from commit c3da1452a444c9599cb85562a3faa82ddfdecec8) --- .../service/schema/SchemaRegistryServiceImpl.java | 3 +++ .../SchemaCompatibilityCheckTest.java | 22 ++++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index 0eff36b..e1c9b13 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -270,6 +270,9 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService { @Override public CompletableFuture<Void> checkConsumerCompatibility(String schemaId, SchemaData schemaData, SchemaCompatibilityStrategy strategy) { + if (SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE == strategy) { + return CompletableFuture.completedFuture(null); + } return getSchema(schemaId).thenCompose(existingSchema -> { if (existingSchema != null && !existingSchema.schema.isDeleted()) { if (strategy == SchemaCompatibilityStrategy.BACKWARD diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java index 02913c6..293f71d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java @@ -395,6 +395,28 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest { } + @Test + public void testAutoProduceSchemaAlwaysCompatible() throws Exception { + final String tenant = PUBLIC_TENANT; + final String topic = "topic" + randomName(16); + + String namespace = "test-namespace-" + randomName(16); + String topicName = TopicName.get( + TopicDomain.persistent.value(), tenant, namespace, topic).toString(); + NamespaceName namespaceName = NamespaceName.get(tenant, namespace); + admin.namespaces().createNamespace(tenant + "/" + namespace, Sets.newHashSet(CLUSTER_NAME)); + + // set ALWAYS_COMPATIBLE + admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + + Producer producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create(); + // should not fail + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).subscriptionName("my-sub").topic(topicName).subscribe(); + + producer.close(); + consumer.close(); + } + @Test(dataProvider = "CanReadLastSchemaCompatibilityStrategy") public void testConsumerWithNotCompatibilitySchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception { final String tenant = PUBLIC_TENANT;