This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5e9e9129d3c [fix][client] Fix potential NPE in TypedMessageBuilderImpl (#24691) 5e9e9129d3c is described below commit 5e9e9129d3c082779d05b808b170f0e7823c42ed Author: ran <r...@streamnative.io> AuthorDate: Mon Sep 1 17:02:46 2025 +0800 [fix][client] Fix potential NPE in TypedMessageBuilderImpl (#24691) --- .../apache/pulsar/client/impl/TypedMessageBuilderImpl.java | 13 +++++++++---- .../pulsar/client/impl/TypedMessageBuilderImplTest.java | 9 +++++++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index e2bb4b0cf97..35c72e2bc54 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -80,7 +80,7 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> { return null; } }).orElseGet(() -> { - EncodeData encodeData = schema.encode(producer.topic, value); + EncodeData encodeData = schema.encode(getTopic(), value); content = ByteBuffer.wrap(encodeData.data()); if (encodeData.hasSchemaId()) { msgMetadata.setSchemaId(encodeData.schemaId()); @@ -275,7 +275,7 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> { public Message<T> getMessage() { beforeSend(); - return MessageImpl.create(msgMetadata, content, schema, producer != null ? producer.getTopic() : null); + return MessageImpl.create(msgMetadata, content, schema, getTopic()); } public long getPublishTime() { @@ -314,7 +314,7 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> { EncodeData keyEncoded = null; // set key as the message key if (keyValue.getKey() != null) { - keyEncoded = keyValueSchema.getKeySchema().encode(producer.topic, keyValue.getKey()); + keyEncoded = keyValueSchema.getKeySchema().encode(getTopic(), keyValue.getKey()); msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(keyEncoded.data())); msgMetadata.setPartitionKeyB64Encoded(true); } else { @@ -324,7 +324,7 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> { EncodeData valueEncoded = null; // set value as the payload if (keyValue.getValue() != null) { - valueEncoded = keyValueSchema.getValueSchema().encode(producer.topic, keyValue.getValue()); + valueEncoded = keyValueSchema.getValueSchema().encode(getTopic(), keyValue.getValue()); content = ByteBuffer.wrap(valueEncoded.data()); } else { msgMetadata.setNullValue(true); @@ -337,4 +337,9 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> { msgMetadata.setSchemaId(schemaId); } } + + private String getTopic() { + return producer != null ? producer.getTopic() : null; + } + } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java index 6d2af96dd04..257fc6885b7 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java @@ -272,4 +272,13 @@ public class TypedMessageBuilderImplTest { } } + @Test + public void testGetMessageWithNullProducer() { + TypedMessageBuilderImpl<byte[]> builder = new TypedMessageBuilderImpl<>(null, Schema.BYTES); + var data = "test".getBytes(); + builder.value(data); + var message = builder.getMessage(); + assertEquals(message.getValue(), data); + } + }