This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 66b340d9 [C++] Support priority message (#1241)
66b340d9 is described below

commit 66b340d9a7568f2351e7a0dda5f041e90f971f65
Author: zhaohai <[email protected]>
AuthorDate: Wed May 20 15:36:00 2026 +0800

    [C++] Support priority message (#1241)
---
 README-CN.md                                       |  2 +-
 README.md                                          |  2 +-
 cpp/examples/BUILD.bazel                           | 11 +++
 cpp/examples/CMakeLists.txt                        |  1 +
 .../ExampleProducerWithPriorityMessage.cpp         | 63 +++++++++++++++
 cpp/include/rocketmq/Message.h                     | 11 +++
 cpp/proto/apache/rocketmq/v2/definition.proto      | 12 +++
 cpp/source/base/Message.cpp                        |  5 ++
 cpp/source/client/ClientManagerImpl.cpp            |  5 ++
 cpp/source/rocketmq/ProducerImpl.cpp               |  4 +
 cpp/source/rocketmq/tests/BUILD.bazel              |  8 ++
 cpp/source/rocketmq/tests/PriorityMessageTest.cpp  | 93 ++++++++++++++++++++++
 12 files changed, 215 insertions(+), 2 deletions(-)

diff --git a/README-CN.md b/README-CN.md
index 67dd9e92..96caedf9 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -28,7 +28,7 @@
 | Push consumer with concurrent message listener |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 | Push consumer with FIFO message listener       |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 | Push consumer with FIFO consume accelerator    |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
-| Priority Message                               |   ✅   |   🚧   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
+| Priority Message                               |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 
 ## 先决条件和构建
 
diff --git a/README.md b/README.md
index b33aa49d..b7cf00ab 100644
--- a/README.md
+++ b/README.md
@@ -28,7 +28,7 @@ Provide cloud-native and robust solutions for Java, C++, C#, 
Golang, Rust and al
 | Push consumer with concurrent message listener |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 | Push consumer with FIFO message listener       |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 | Push consumer with FIFO consume accelerator    |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
-| Priority Message                               |   ✅   |   🚧   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
+| Priority Message                               |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 
 ## Prerequisite and Build
 
diff --git a/cpp/examples/BUILD.bazel b/cpp/examples/BUILD.bazel
index 771b0e04..44efb924 100644
--- a/cpp/examples/BUILD.bazel
+++ b/cpp/examples/BUILD.bazel
@@ -60,6 +60,17 @@ cc_binary(
     ],
 )
 
+cc_binary(
+    name = "example_producer_with_priority_message",
+    srcs = [
+        "ExampleProducerWithPriorityMessage.cpp",
+    ],
+    deps = [
+        "//source/rocketmq:rocketmq_library",
+        "@com_github_gflags_gflags//:gflags",
+    ],
+)
+
 cc_binary(
     name = "example_producer_with_transactional_message",
     srcs = [
diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt
index 27304477..8de1b9df 100644
--- a/cpp/examples/CMakeLists.txt
+++ b/cpp/examples/CMakeLists.txt
@@ -8,6 +8,7 @@ add_example(example_fifo_producer ExampleFifoProducer.cpp)
 add_example(example_producer_with_async ExampleProducerWithAsync.cpp)
 add_example(example_producer_with_fifo_message 
ExampleProducerWithFifoMessage.cpp)
 add_example(example_producer_with_timed_message 
ExampleProducerWithTimedMessage.cpp)
+add_example(example_producer_with_priority_message 
ExampleProducerWithPriorityMessage.cpp)
 add_example(example_producer_with_transactional_message 
ExampleProducerWithTransactionalMessage.cpp)
 add_example(example_push_consumer ExamplePushConsumer.cpp)
 add_example(example_simple_consumer ExampleSimpleConsumer.cpp)
\ No newline at end of file
diff --git a/cpp/examples/ExampleProducerWithPriorityMessage.cpp 
b/cpp/examples/ExampleProducerWithPriorityMessage.cpp
new file mode 100644
index 00000000..2da14db8
--- /dev/null
+++ b/cpp/examples/ExampleProducerWithPriorityMessage.cpp
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <iostream>
+#include <thread>
+
+#include "rocketmq/Producer.h"
+#include "rocketmq/Message.h"
+
+using namespace rocketmq;
+
+int main() {
+  // Create producer configuration with topic
+  auto producer = Producer::newBuilder()
+                      .withConfiguration(Configuration::newBuilder()
+                                             .withEndpoints("127.0.0.1:9876")
+                                             .build())
+                      .withTopics({"PriorityMessageTopic"})
+                      .build();
+  
+  // Send priority messages with different priority levels
+  for (int i = 0; i < 5; ++i) {
+    // Create a priority message
+    auto message = Message::newBuilder()
+                       .withTopic("PriorityMessageTopic")
+                       .withTag("PriorityTag")
+                       .withKeys({"key1", "key2"})
+                       .withBody("This is a priority message with level " + 
std::to_string(i))
+                       .withPriority(i)  // Set priority level (higher value = 
higher priority)
+                       .build();
+    
+    // Send the message
+    std::error_code ec;
+    auto send_receipt = producer.send(std::move(message), ec);
+    if (ec) {
+      std::cerr << "Failed to send message: " << ec.message() << std::endl;
+    } else {
+      std::cout << "Sent priority message with level " << i 
+                << ", message ID: " << send_receipt.message_id << std::endl;
+    }
+    
+    // Small delay between sends
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+  }
+  
+  std::cout << "Producer finished sending messages" << std::endl;
+  return 0;
+}
diff --git a/cpp/include/rocketmq/Message.h b/cpp/include/rocketmq/Message.h
index 3a917b1f..01bc58ed 100644
--- a/cpp/include/rocketmq/Message.h
+++ b/cpp/include/rocketmq/Message.h
@@ -100,6 +100,14 @@ public:
     return group_;
   }
 
+  std::int32_t priority() const {
+    return priority_;
+  }
+
+  void priority(std::int32_t priority) {
+    priority_ = priority;
+  }
+
   const Extension& extension() const {
     return extension_;
   }
@@ -127,6 +135,7 @@ private:
   std::string body_;
   std::unordered_map<std::string, std::string> properties_;
   std::string group_;
+  std::int32_t priority_{-1};
   Extension extension_;
 };
 
@@ -151,6 +160,8 @@ public:
 
   MessageBuilder& withGroup(std::string group);
 
+  MessageBuilder& withPriority(std::int32_t priority);
+
   MessageBuilder& withProperties(std::unordered_map<std::string, std::string> 
properties);
 
   /**
diff --git a/cpp/proto/apache/rocketmq/v2/definition.proto 
b/cpp/proto/apache/rocketmq/v2/definition.proto
index 468c4105..c0762b0c 100644
--- a/cpp/proto/apache/rocketmq/v2/definition.proto
+++ b/cpp/proto/apache/rocketmq/v2/definition.proto
@@ -146,6 +146,12 @@ enum MessageType {
   // Messages that are transactional. Only committed messages are delivered to
   // subscribers.
   TRANSACTION = 4;
+
+  // lite topic
+  LITE = 5;
+
+  // Messages that lower prioritised ones may need to wait for higher priority 
messages to be processed first
+  PRIORITY = 6;
 }
 
 enum DigestType {
@@ -270,6 +276,12 @@ message SystemProperties {
 
   // Information to identify whether this message is from dead letter queue.
   optional DeadLetterQueue dead_letter_queue = 20;
+
+  // lite topic
+  optional string lite_topic = 21;
+
+  // Priority of message, which is optional
+  optional int32 priority = 22;
 }
 
 message DeadLetterQueue {
diff --git a/cpp/source/base/Message.cpp b/cpp/source/base/Message.cpp
index 8d82bd3f..1fa18136 100644
--- a/cpp/source/base/Message.cpp
+++ b/cpp/source/base/Message.cpp
@@ -64,6 +64,11 @@ MessageBuilder& MessageBuilder::withGroup(std::string group) 
{
   return *this;
 }
 
+MessageBuilder& MessageBuilder::withPriority(std::int32_t priority) {
+  message_->priority_ = priority;
+  return *this;
+}
+
 MessageBuilder& MessageBuilder::withProperties(std::unordered_map<std::string, 
std::string> properties) {
   message_->properties_ = std::move(properties);
   return *this;
diff --git a/cpp/source/client/ClientManagerImpl.cpp 
b/cpp/source/client/ClientManagerImpl.cpp
index 9c6b7a0e..9f6d268e 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -813,6 +813,11 @@ MessageConstSharedPtr ClientManagerImpl::wrapMessage(const 
rmq::Message& item) {
     builder.withGroup(system_properties.message_group());
   }
 
+  // Priority
+  if (system_properties.has_priority()) {
+    builder.withPriority(system_properties.priority());
+  }
+
   // Message-Id
   const auto& message_id = system_properties.message_id();
   builder.withId(message_id);
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp 
b/cpp/source/rocketmq/ProducerImpl.cpp
index b7dd82e8..802c151c 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -155,10 +155,14 @@ void ProducerImpl::wrapSendMessageRequest(const Message& 
message, SendMessageReq
 
   system_properties->set_born_host(UtilAll::hostname());
 
+  // Determine message type based on properties
   if (message.deliveryTimestamp().time_since_epoch().count()) {
     system_properties->set_message_type(rmq::MessageType::DELAY);
   } else if (!message.group().empty()) {
     system_properties->set_message_type(rmq::MessageType::FIFO);
+  } else if (message.priority() >= 0) {
+    system_properties->set_message_type(rmq::MessageType::PRIORITY);
+    system_properties->set_priority(message.priority());
   } else if (message.extension().transactional) {
     system_properties->set_message_type(rmq::MessageType::TRANSACTION);
   } else {
diff --git a/cpp/source/rocketmq/tests/BUILD.bazel 
b/cpp/source/rocketmq/tests/BUILD.bazel
index 74751c35..1c9e2aa2 100644
--- a/cpp/source/rocketmq/tests/BUILD.bazel
+++ b/cpp/source/rocketmq/tests/BUILD.bazel
@@ -67,4 +67,12 @@ cc_test(
         "OptionalTest.cpp",
     ],
     deps = base_deps
+)
+
+cc_test(
+    name = "priority_message_test",
+    srcs = [
+        "PriorityMessageTest.cpp",
+    ],
+    deps = base_deps
 )
\ No newline at end of file
diff --git a/cpp/source/rocketmq/tests/PriorityMessageTest.cpp 
b/cpp/source/rocketmq/tests/PriorityMessageTest.cpp
new file mode 100644
index 00000000..5f57001e
--- /dev/null
+++ b/cpp/source/rocketmq/tests/PriorityMessageTest.cpp
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+
+#include "rocketmq/Message.h"
+#include "gtest/gtest.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+TEST(PriorityMessageTest, testPriorityMessageBuilder) {
+  // Test creating a priority message with builder
+  auto message = Message::newBuilder()
+                     .withTopic("PriorityTopic")
+                     .withBody("Test priority message")
+                     .withTag("PriorityTag")
+                     .withKeys({"key1", "key2"})
+                     .withPriority(5)
+                     .build();
+
+  ASSERT_NE(message, nullptr);
+  EXPECT_EQ(message->topic(), "PriorityTopic");
+  EXPECT_EQ(message->body(), "Test priority message");
+  EXPECT_EQ(message->tag(), "PriorityTag");
+  EXPECT_EQ(message->priority(), 5);
+  
+  auto& keys = message->keys();
+  ASSERT_EQ(keys.size(), 2);
+  EXPECT_EQ(keys[0], "key1");
+  EXPECT_EQ(keys[1], "key2");
+}
+
+TEST(PriorityMessageTest, testDefaultPriority) {
+  // Test that default priority is -1 (not set)
+  auto message = Message::newBuilder()
+                     .withTopic("NormalTopic")
+                     .withBody("Normal message")
+                     .build();
+
+  ASSERT_NE(message, nullptr);
+  EXPECT_EQ(message->priority(), -1);
+}
+
+TEST(PriorityMessageTest, testDifferentPriorityLevels) {
+  // Test different priority levels
+  for (int i = 0; i <= 10; ++i) {
+    auto message = Message::newBuilder()
+                       .withTopic("PriorityTopic")
+                       .withBody("Message with priority " + std::to_string(i))
+                       .withPriority(i)
+                       .build();
+
+    ASSERT_NE(message, nullptr);
+    EXPECT_EQ(message->priority(), i);
+  }
+}
+
+TEST(PriorityMessageTest, testPriorityMessageWithProperties) {
+  // Test priority message with custom properties
+  std::unordered_map<std::string, std::string> properties;
+  properties["custom_key"] = "custom_value";
+  properties["another_key"] = "another_value";
+
+  auto message = Message::newBuilder()
+                     .withTopic("PriorityTopic")
+                     .withBody("Priority message with properties")
+                     .withPriority(8)
+                     .withProperties(properties)
+                     .build();
+
+  ASSERT_NE(message, nullptr);
+  EXPECT_EQ(message->priority(), 8);
+  
+  auto& msg_properties = message->properties();
+  EXPECT_EQ(msg_properties.size(), 2);
+  EXPECT_EQ(msg_properties.at("custom_key"), "custom_value");
+  EXPECT_EQ(msg_properties.at("another_key"), "another_value");
+}
+
+ROCKETMQ_NAMESPACE_END

Reply via email to