This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 93f10ef  Support get the producer name of a message (#524)
93f10ef is described below

commit 93f10ef76f4e2301472a0ec39ddad8287c1205ee
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Nov 19 17:04:59 2025 +0800

    Support get the producer name of a message (#524)
---
 include/pulsar/Message.h       |  7 +++++++
 include/pulsar/c/message.h     |  6 ++++++
 lib/Message.cc                 |  7 +++++++
 lib/c/c_Message.cc             |  4 ++++
 tests/BasicEndToEndTest.cc     | 13 ++++++++++++-
 tests/MessageTest.cc           |  1 +
 tests/c/c_BasicEndToEndTest.cc | 11 +++++++++--
 7 files changed, 46 insertions(+), 3 deletions(-)

diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index 2e78e98..ea4c4ab 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -195,6 +195,13 @@ class PULSAR_PUBLIC Message {
      */
     const std::string& getSchemaVersion() const;
 
+    /**
+     * Get the producer name which produced this message.
+     *
+     * @return the producer name or empty string if not available
+     */
+    const std::string& getProducerName() const noexcept;
+
     bool operator==(const Message& msg) const;
 
    protected:
diff --git a/include/pulsar/c/message.h b/include/pulsar/c/message.h
index 30f1bad..1f1f91f 100644
--- a/include/pulsar/c/message.h
+++ b/include/pulsar/c/message.h
@@ -215,6 +215,12 @@ PULSAR_PUBLIC const char 
*pulsar_message_get_schemaVersion(pulsar_message_t *mes
 
 PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t 
*message, const char *schemaVersion);
 
+/**
+ * Returns the producer name which produced this message. The pointer points 
to an internal string, so the
+ * caller should not free it.
+ */
+PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t 
*message);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/Message.cc b/lib/Message.cc
index 2a6e8a3..1e26b52 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -213,6 +213,13 @@ uint64_t Message::getPublishTimestamp() const { return 
impl_ ? impl_->getPublish
 
 uint64_t Message::getEventTimestamp() const { return impl_ ? 
impl_->getEventTimestamp() : 0ull; }
 
+const std::string& Message::getProducerName() const noexcept {
+    if (!impl_) {
+        return emptyString;
+    }
+    return impl_->metadata.producer_name();
+}
+
 bool Message::operator==(const Message& msg) const { return getMessageId() == 
msg.getMessageId(); }
 
 KeyValue Message::getKeyValueData() const { return 
KeyValue(impl_->keyValuePtr); }
diff --git a/lib/c/c_Message.cc b/lib/c/c_Message.cc
index 3d30095..02c4ce3 100644
--- a/lib/c/c_Message.cc
+++ b/lib/c/c_Message.cc
@@ -140,3 +140,7 @@ const char 
*pulsar_message_get_schemaVersion(pulsar_message_t *message) {
 int pulsar_message_has_schema_version(pulsar_message_t *message) {
     return message->message.hasSchemaVersion();
 }
+
+const char *pulsar_message_get_producer_name(pulsar_message_t *message) {
+    return message->message.getProducerName().c_str();
+}
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index 5cf478b..c9a8faa 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -242,9 +242,20 @@ TEST(BasicEndToEndTest, testProduceConsume) {
     Message receivedMsg;
     consumer.receive(receivedMsg);
     ASSERT_EQ(content, receivedMsg.getDataAsString());
+    ASSERT_FALSE(receivedMsg.getProducerName().empty());
+    ASSERT_EQ(ResultOk, producer.close());
+
+    ProducerConfiguration conf;
+    conf.setProducerName("test-producer");
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+    producer.send(MessageBuilder().setContent("msg-2-content").build());
+    consumer.receive(receivedMsg);
+    ASSERT_EQ("msg-2-content", receivedMsg.getDataAsString());
+    ASSERT_EQ("test-producer", receivedMsg.getProducerName());
+    consumer.acknowledge(receivedMsg);
     ASSERT_EQ(ResultOk, consumer.unsubscribe());
     ASSERT_EQ(ResultOk, consumer.close());
-    ASSERT_EQ(ResultOk, producer.close());
+    ASSERT_EQ(ResultOk, consumer.close());
     ASSERT_EQ(ResultOk, client.close());
 }
 
diff --git a/tests/MessageTest.cc b/tests/MessageTest.cc
index 513ea8d..688cb33 100644
--- a/tests/MessageTest.cc
+++ b/tests/MessageTest.cc
@@ -42,6 +42,7 @@ TEST(MessageTest, testMessageContents) {
     ASSERT_NE(myContents.c_str(), (char*)msg.getData());
     ASSERT_EQ(myContents, msg.getDataAsString());
     ASSERT_EQ(std::string("mycontents").length(), msg.getLength());
+    ASSERT_TRUE(msg.getProducerName().empty());
 }
 
 TEST(MessageTest, testAllocatedContents) {
diff --git a/tests/c/c_BasicEndToEndTest.cc b/tests/c/c_BasicEndToEndTest.cc
index b319727..34acd14 100644
--- a/tests/c/c_BasicEndToEndTest.cc
+++ b/tests/c/c_BasicEndToEndTest.cc
@@ -34,6 +34,7 @@ struct receive_ctx {
     pulsar_result result;
     pulsar_consumer_t *consumer;
     char *data;
+    char *producer_name;
     std::promise<void> *promise;
 };
 
@@ -57,6 +58,9 @@ static void receive_callback(pulsar_result async_result, 
pulsar_message_t *msg,
         const char *data = (const char *)pulsar_message_get_data(msg);
         receive_ctx->data = (char *)malloc(strlen(data) * sizeof(char) + 1);
         strcpy(receive_ctx->data, data);
+        const char *producer_name = pulsar_message_get_producer_name(msg);
+        receive_ctx->producer_name = (char *)malloc(strlen(producer_name) * 
sizeof(char) + 1);
+        strcpy(receive_ctx->producer_name, producer_name);
     }
     receive_ctx->promise->set_value();
     pulsar_message_free(msg);
@@ -71,6 +75,7 @@ TEST(c_BasicEndToEndTest, testAsyncProduceConsume) {
     pulsar_client_t *client = pulsar_client_create(lookup_url, conf);
 
     pulsar_producer_configuration_t *producer_conf = 
pulsar_producer_configuration_create();
+    pulsar_producer_configuration_set_producer_name(producer_conf, 
"test-producer");
     pulsar_producer_t *producer;
     pulsar_result result = pulsar_client_create_producer(client, topic_name, 
producer_conf, &producer);
     ASSERT_EQ(pulsar_result_Ok, result);
@@ -101,12 +106,14 @@ TEST(c_BasicEndToEndTest, testAsyncProduceConsume) {
     // receive asynchronously
     std::promise<void> receive_promise;
     std::future<void> receive_future = receive_promise.get_future();
-    struct receive_ctx receive_ctx = {pulsar_result_UnknownError, consumer, 
NULL, &receive_promise};
+    struct receive_ctx receive_ctx = {pulsar_result_UnknownError, consumer, 
NULL, NULL, &receive_promise};
     pulsar_consumer_receive_async(consumer, receive_callback, &receive_ctx);
     receive_future.get();
     ASSERT_EQ(pulsar_result_Ok, receive_ctx.result);
+    ASSERT_STREQ("test-producer", receive_ctx.producer_name);
     ASSERT_STREQ(content, receive_ctx.data);
-    delete receive_ctx.data;
+    free(receive_ctx.data);
+    free(receive_ctx.producer_name);
 
     ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_unsubscribe(consumer));
     ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_close(consumer));

Reply via email to