This is an automated email from the ASF dual-hosted git repository.

BewareMyPower pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new f2c0fec  feat: expose replicated_from proto field to Message (#583)
f2c0fec is described below

commit f2c0fecd388318d7a940cc1aabcd3adead7d5879
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Jun 1 18:05:31 2026 +0800

    feat: expose replicated_from proto field to Message (#583)
---
 include/pulsar/Message.h   |  8 ++++++++
 include/pulsar/c/message.h |  9 +++++++++
 lib/Message.cc             |  7 +++++++
 lib/c/c_Message.cc         |  5 +++++
 tests/MessageTest.cc       | 12 ++++++++++++
 tests/c/c_MessageTest.cc   | 16 ++++++++++++++++
 6 files changed, 57 insertions(+)

diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index b92ec6a..1c906e8 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -220,6 +220,14 @@ class PULSAR_PUBLIC Message {
      */
     const std::string& getProducerName() const noexcept;
 
+    /**
+     * Get the source cluster from which the message was replicated.
+     *
+     * @return the optional pointer to the source cluster name if the message 
was replicated, the pointer is
+     * valid as the Message instance is alive
+     */
+    std::optional<const std::string*> getReplicatedFrom() const;
+
     /**
      * @return the optional encryption context that is present when the 
message is encrypted, the pointer is
      * valid as the Message instance is alive
diff --git a/include/pulsar/c/message.h b/include/pulsar/c/message.h
index 8aceca5..af22639 100644
--- a/include/pulsar/c/message.h
+++ b/include/pulsar/c/message.h
@@ -230,6 +230,15 @@ PULSAR_PUBLIC void 
pulsar_message_set_schema_version(pulsar_message_t *message,
  */
 PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t 
*message);
 
+/**
+ * Get the source cluster from which the message was replicated.
+ *
+ * The pointer points to internal storage owned by the message wrapper, so the 
caller should not free it.
+ *
+ * @return the source cluster name, or NULL if the message is not replicated
+ */
+PULSAR_PUBLIC const char *pulsar_message_get_replicated_from(pulsar_message_t 
*message);
+
 /**
  * Check if the message has a null value.
  *
diff --git a/lib/Message.cc b/lib/Message.cc
index f4e6d69..8bde683 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -239,6 +239,13 @@ const std::string& Message::getProducerName() const 
noexcept {
     return impl_->metadata.producer_name();
 }
 
+std::optional<const std::string*> Message::getReplicatedFrom() const {
+    if (!impl_ || !impl_->metadata.has_replicated_from()) {
+        return std::nullopt;
+    }
+    return &impl_->metadata.replicated_from();
+}
+
 std::optional<const EncryptionContext*> Message::getEncryptionContext() const {
     if (!impl_ || !impl_->encryptionContext_.has_value()) {
         return std::nullopt;
diff --git a/lib/c/c_Message.cc b/lib/c/c_Message.cc
index 51afa8e..6d36309 100644
--- a/lib/c/c_Message.cc
+++ b/lib/c/c_Message.cc
@@ -151,4 +151,9 @@ const char 
*pulsar_message_get_producer_name(pulsar_message_t *message) {
     return message->message.getProducerName().c_str();
 }
 
+const char *pulsar_message_get_replicated_from(pulsar_message_t *message) {
+    const auto replicatedFrom = message->message.getReplicatedFrom();
+    return replicatedFrom ? replicatedFrom.value()->c_str() : nullptr;
+}
+
 int pulsar_message_has_null_value(pulsar_message_t *message) { return 
message->message.hasNullValue(); }
diff --git a/tests/MessageTest.cc b/tests/MessageTest.cc
index 0ffcc41..7f1ae4d 100644
--- a/tests/MessageTest.cc
+++ b/tests/MessageTest.cc
@@ -22,6 +22,7 @@
 
 #include <string>
 
+#include "PulsarFriend.h"
 #include "lib/MessageImpl.h"
 
 using namespace pulsar;
@@ -154,6 +155,17 @@ TEST(MessageTest, testGetTopicNameOnProducerMessage) {
     ASSERT_TRUE(msg.getTopicName().empty());
 }
 
+TEST(MessageTest, testReplicationMetadataAccessors) {
+    auto msg = MessageBuilder().setContent("test").build();
+    ASSERT_FALSE(msg.getReplicatedFrom().has_value());
+
+    PulsarFriend::getMessageMetadata(msg).set_replicated_from("us-west1");
+
+    const auto replicatedFrom = msg.getReplicatedFrom();
+    ASSERT_TRUE(replicatedFrom.has_value());
+    ASSERT_EQ(*replicatedFrom.value(), "us-west1");
+}
+
 TEST(MessageTest, testNullValueMessage) {
     {
         auto msg = MessageBuilder().setContent("test").build();
diff --git a/tests/c/c_MessageTest.cc b/tests/c/c_MessageTest.cc
index a64a990..7a2ee50 100644
--- a/tests/c/c_MessageTest.cc
+++ b/tests/c/c_MessageTest.cc
@@ -20,6 +20,8 @@
 #include <lib/c/c_structs.h>
 #include <pulsar/c/message.h>
 
+#include "../PulsarFriend.h"
+
 TEST(c_MessageTest, MessageCopy) {
     pulsar_message_t *from = pulsar_message_create();
     pulsar_message_set_content(from, "hello", 5);
@@ -32,3 +34,17 @@ TEST(c_MessageTest, MessageCopy) {
     pulsar_message_free(from);
     pulsar_message_free(to);
 }
+
+TEST(c_MessageTest, ReplicationMetadataAccessors) {
+    pulsar_message_t *message = pulsar_message_create();
+    pulsar_message_set_content(message, "hello", 5);
+    message->message = message->builder.build();
+
+    ASSERT_EQ(nullptr, pulsar_message_get_replicated_from(message));
+
+    
PulsarFriend::getMessageMetadata(message->message).set_replicated_from("us-west1");
+
+    ASSERT_STREQ("us-west1", pulsar_message_get_replicated_from(message));
+
+    pulsar_message_free(message);
+}

Reply via email to