This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d3d5963bdcf [fix][client] Fix thread-safety of AutoProduceBytesSchema
(#25014)
d3d5963bdcf is described below
commit d3d5963bdcfaa7b03bef4d14bebba4924e2b5cf7
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Nov 27 04:08:41 2025 +0200
[fix][client] Fix thread-safety of AutoProduceBytesSchema (#25014)
---
.../client/impl/schema/AutoProduceBytesSchema.java | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index 88809748740..927ae94e027 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -33,11 +33,12 @@ import org.apache.pulsar.common.schema.SchemaType;
public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
@Setter
- private boolean requireSchemaValidation = true;
- private Schema<T> schema;
- private boolean userProvidedSchema;
+ private volatile boolean requireSchemaValidation = true;
+ private volatile Schema<T> schema;
+ private final boolean userProvidedSchema;
public AutoProduceBytesSchema() {
+ this.userProvidedSchema = false;
}
public AutoProduceBytesSchema(Schema<T> schema) {
@@ -87,11 +88,12 @@ public class AutoProduceBytesSchema<T> implements
Schema<byte[]> {
if (requireSchemaValidation) {
// verify if the message can be decoded by the underlying schema
- if (schema instanceof KeyValueSchema
- && ((KeyValueSchema)
schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
- ((KeyValueSchema) schema).getValueSchema().validate(message);
+ Schema<T> localSchema = schema;
+ if (localSchema instanceof KeyValueSchema && ((KeyValueSchema)
localSchema).getKeyValueEncodingType()
+ .equals(KeyValueEncodingType.SEPARATED)) {
+ ((KeyValueSchema)
localSchema).getValueSchema().validate(message);
} else {
- schema.validate(message);
+ localSchema.validate(message);
}
}