This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 469df5c6f08 [fix][client] Fix thread-safety of AutoProduceBytesSchema
(#25014)
469df5c6f08 is described below
commit 469df5c6f08a56cff5067c5709b842d50bae9c5b
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Nov 27 04:08:41 2025 +0200
[fix][client] Fix thread-safety of AutoProduceBytesSchema (#25014)
---
.../client/impl/schema/AutoProduceBytesSchema.java | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index cc134b57b21..9abbf81576d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -33,11 +33,12 @@ import org.apache.pulsar.common.schema.SchemaType;
public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
@Setter
- private boolean requireSchemaValidation = true;
- private Schema<T> schema;
- private boolean userProvidedSchema;
+ private volatile boolean requireSchemaValidation = true;
+ private volatile Schema<T> schema;
+ private final boolean userProvidedSchema;
public AutoProduceBytesSchema() {
+ this.userProvidedSchema = false;
}
public AutoProduceBytesSchema(Schema<T> schema) {
@@ -81,11 +82,12 @@ public class AutoProduceBytesSchema<T> implements
Schema<byte[]> {
if (requireSchemaValidation) {
// verify if the message can be decoded by the underlying schema
- if (schema instanceof KeyValueSchema
- && ((KeyValueSchema)
schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
- ((KeyValueSchema) schema).getValueSchema().validate(message);
+ Schema<T> localSchema = schema;
+ if (localSchema instanceof KeyValueSchema && ((KeyValueSchema)
localSchema).getKeyValueEncodingType()
+ .equals(KeyValueEncodingType.SEPARATED)) {
+ ((KeyValueSchema)
localSchema).getValueSchema().validate(message);
} else {
- schema.validate(message);
+ localSchema.validate(message);
}
}