This is an automated email from the ASF dual-hosted git repository.
rgao pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 368381aa1eb [fix][client] Fix potential NPE in TypedMessageBuilderImpl
(#24691)
368381aa1eb is described below
commit 368381aa1eb2c4645d9b0b5848f262c98642aad2
Author: ran <[email protected]>
AuthorDate: Mon Sep 1 17:02:46 2025 +0800
[fix][client] Fix potential NPE in TypedMessageBuilderImpl (#24691)
(cherry picked from commit 5e9e9129d3c082779d05b808b170f0e7823c42ed)
---
.../apache/pulsar/client/impl/TypedMessageBuilderImpl.java | 13 +++++++++----
.../pulsar/client/impl/TypedMessageBuilderImplTest.java | 9 +++++++++
2 files changed, 18 insertions(+), 4 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index e2bb4b0cf97..35c72e2bc54 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -80,7 +80,7 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
return null;
}
}).orElseGet(() -> {
- EncodeData encodeData = schema.encode(producer.topic, value);
+ EncodeData encodeData = schema.encode(getTopic(), value);
content = ByteBuffer.wrap(encodeData.data());
if (encodeData.hasSchemaId()) {
msgMetadata.setSchemaId(encodeData.schemaId());
@@ -275,7 +275,7 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
public Message<T> getMessage() {
beforeSend();
- return MessageImpl.create(msgMetadata, content, schema, producer !=
null ? producer.getTopic() : null);
+ return MessageImpl.create(msgMetadata, content, schema, getTopic());
}
public long getPublishTime() {
@@ -314,7 +314,7 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
EncodeData keyEncoded = null;
// set key as the message key
if (keyValue.getKey() != null) {
- keyEncoded = keyValueSchema.getKeySchema().encode(producer.topic,
keyValue.getKey());
+ keyEncoded = keyValueSchema.getKeySchema().encode(getTopic(),
keyValue.getKey());
msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(keyEncoded.data()));
msgMetadata.setPartitionKeyB64Encoded(true);
} else {
@@ -324,7 +324,7 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
EncodeData valueEncoded = null;
// set value as the payload
if (keyValue.getValue() != null) {
- valueEncoded =
keyValueSchema.getValueSchema().encode(producer.topic, keyValue.getValue());
+ valueEncoded = keyValueSchema.getValueSchema().encode(getTopic(),
keyValue.getValue());
content = ByteBuffer.wrap(valueEncoded.data());
} else {
msgMetadata.setNullValue(true);
@@ -337,4 +337,9 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
msgMetadata.setSchemaId(schemaId);
}
}
+
+ private String getTopic() {
+ return producer != null ? producer.getTopic() : null;
+ }
+
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
index 6d2af96dd04..257fc6885b7 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
@@ -272,4 +272,13 @@ public class TypedMessageBuilderImplTest {
}
}
+ @Test
+ public void testGetMessageWithNullProducer() {
+ TypedMessageBuilderImpl<byte[]> builder = new
TypedMessageBuilderImpl<>(null, Schema.BYTES);
+ var data = "test".getBytes();
+ builder.value(data);
+ var message = builder.getMessage();
+ assertEquals(message.getValue(), data);
+ }
+
}