This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2a8afc9021a4145cc15244e9aa0f0aefa6087738 Author: Lari Hotari <[email protected]> AuthorDate: Tue Nov 25 11:53:02 2025 +0200 [fix][broker] Fix issue with schemaValidationEnforced in geo-replication (#25012) (cherry picked from commit ec609af4a319ad58600c497fd869dfad0869c226) --- ...eWayReplicatorSchemaValidationEnforcedTest.java | 103 +++++++++++++++++++++ .../pulsar/client/impl/PulsarClientImpl.java | 2 +- .../client/impl/schema/AutoProduceBytesSchema.java | 6 ++ 3 files changed, 110 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java new file mode 100644 index 00000000000..2dd94bb1cd1 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorSchemaValidationEnforcedTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.assertj.core.api.Assertions.assertThat; +import java.util.concurrent.TimeUnit; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.apache.pulsar.zookeeper.ZookeeperServerTest; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-replication") +public class OneWayReplicatorSchemaValidationEnforcedTest extends OneWayReplicatorTestBase { + + @Override + @BeforeClass(alwaysRun = true, timeOut = 300000) + public void setup() throws Exception { + super.setup(); + } + + @Override + @AfterClass(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + + @Data + private static class MyClass { + int field1; + String field2; + Long field3; + } + + @Override + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, + LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { + super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk); + config.setSchemaValidationEnforced(true); + // disable topic auto creation so that it's possible to reproduce the scenario consistently + config.setAllowAutoTopicCreation(false); + } + + @Test(timeOut = 30000) + public void testReplicationWithAvroSchemaWithSchemaValidationEnforced() throws Exception { + Schema<MyClass> myClassSchema = Schema.AVRO(MyClass.class); + final String topicName = + BrokerTestUtil.newUniqueName("persistent://" + sourceClusterAlwaysSchemaCompatibleNamespace + "/tp_"); + // create the topic and schema in the local cluster (r1) + admin1.topics().createNonPartitionedTopic(topicName); + admin1.schemas().createSchema(topicName, myClassSchema.getSchemaInfo()); + // create the topic and schema in the remote cluster (r2) + admin2.topics().createNonPartitionedTopic(topicName); + admin2.schemas().createSchema(topicName, myClassSchema.getSchemaInfo()); + + // consume from the remote cluster (r2) + Consumer<MyClass> consumer2 = client2.newConsumer(myClassSchema) + .topic(topicName).subscriptionName("sub").subscribe(); + + // produce to local cluster (r1) + Producer<MyClass> producer1 = client1.newProducer(myClassSchema).topic(topicName).create(); + MyClass sentBody = new MyClass(); + sentBody.setField1(1); + sentBody.setField2("test"); + sentBody.setField3(123456789L); + producer1.send(sentBody); + + // verify that the message was received from the remote cluster (r2) + Message<MyClass> received = consumer2.receive(10, TimeUnit.SECONDS); + assertThat(received).isNotNull(); + assertThat(received.getValue()).isNotNull().satisfies(receivedBody -> { + assertThat(receivedBody.getField1()).isEqualTo(1); + assertThat(receivedBody.getField2()).isEqualTo("test"); + assertThat(receivedBody.getField3()).isEqualTo(123456789L); + }); + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index b2e1c4d0a01..c25d2397229 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -377,7 +377,7 @@ public class PulsarClientImpl implements PulsarClient { if (schema instanceof AutoProduceBytesSchema) { AutoProduceBytesSchema autoProduceBytesSchema = (AutoProduceBytesSchema) schema; - if (autoProduceBytesSchema.schemaInitialized()) { + if (autoProduceBytesSchema.hasUserProvidedSchema()) { return createProducerAsync(topic, conf, schema, interceptors); } return lookup.getSchema(TopicName.get(conf.getTopicName())) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java index d19f1180f06..f522eebe78a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java @@ -35,12 +35,14 @@ public class AutoProduceBytesSchema<T> implements Schema<byte[]> { @Setter private boolean requireSchemaValidation = true; private Schema<T> schema; + private boolean userProvidedSchema; public AutoProduceBytesSchema() { } public AutoProduceBytesSchema(Schema<T> schema) { this.schema = schema; + this.userProvidedSchema = true; SchemaInfo schemaInfo = schema.getSchemaInfo(); this.requireSchemaValidation = schemaInfo != null && schemaInfo.getType() != SchemaType.BYTES @@ -62,6 +64,10 @@ public class AutoProduceBytesSchema<T> implements Schema<byte[]> { return schema != null; } + public boolean hasUserProvidedSchema() { + return userProvidedSchema; + } + @Override public void validate(byte[] message) { ensureSchemaInitialized();
