This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 66b340d9 [C++] Support priority message (#1241)
66b340d9 is described below
commit 66b340d9a7568f2351e7a0dda5f041e90f971f65
Author: zhaohai <[email protected]>
AuthorDate: Wed May 20 15:36:00 2026 +0800
[C++] Support priority message (#1241)
---
README-CN.md | 2 +-
README.md | 2 +-
cpp/examples/BUILD.bazel | 11 +++
cpp/examples/CMakeLists.txt | 1 +
.../ExampleProducerWithPriorityMessage.cpp | 63 +++++++++++++++
cpp/include/rocketmq/Message.h | 11 +++
cpp/proto/apache/rocketmq/v2/definition.proto | 12 +++
cpp/source/base/Message.cpp | 5 ++
cpp/source/client/ClientManagerImpl.cpp | 5 ++
cpp/source/rocketmq/ProducerImpl.cpp | 4 +
cpp/source/rocketmq/tests/BUILD.bazel | 8 ++
cpp/source/rocketmq/tests/PriorityMessageTest.cpp | 93 ++++++++++++++++++++++
12 files changed, 215 insertions(+), 2 deletions(-)
diff --git a/README-CN.md b/README-CN.md
index 67dd9e92..96caedf9 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -28,7 +28,7 @@
| Push consumer with concurrent message listener | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Push consumer with FIFO message listener | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Push consumer with FIFO consume accelerator | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
-| Priority Message | ✅ | 🚧 | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
+| Priority Message | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
## 先决条件和构建
diff --git a/README.md b/README.md
index b33aa49d..b7cf00ab 100644
--- a/README.md
+++ b/README.md
@@ -28,7 +28,7 @@ Provide cloud-native and robust solutions for Java, C++, C#,
Golang, Rust and al
| Push consumer with concurrent message listener | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Push consumer with FIFO message listener | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Push consumer with FIFO consume accelerator | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
-| Priority Message | ✅ | 🚧 | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
+| Priority Message | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
## Prerequisite and Build
diff --git a/cpp/examples/BUILD.bazel b/cpp/examples/BUILD.bazel
index 771b0e04..44efb924 100644
--- a/cpp/examples/BUILD.bazel
+++ b/cpp/examples/BUILD.bazel
@@ -60,6 +60,17 @@ cc_binary(
],
)
+cc_binary(
+ name = "example_producer_with_priority_message",
+ srcs = [
+ "ExampleProducerWithPriorityMessage.cpp",
+ ],
+ deps = [
+ "//source/rocketmq:rocketmq_library",
+ "@com_github_gflags_gflags//:gflags",
+ ],
+)
+
cc_binary(
name = "example_producer_with_transactional_message",
srcs = [
diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt
index 27304477..8de1b9df 100644
--- a/cpp/examples/CMakeLists.txt
+++ b/cpp/examples/CMakeLists.txt
@@ -8,6 +8,7 @@ add_example(example_fifo_producer ExampleFifoProducer.cpp)
add_example(example_producer_with_async ExampleProducerWithAsync.cpp)
add_example(example_producer_with_fifo_message
ExampleProducerWithFifoMessage.cpp)
add_example(example_producer_with_timed_message
ExampleProducerWithTimedMessage.cpp)
+add_example(example_producer_with_priority_message
ExampleProducerWithPriorityMessage.cpp)
add_example(example_producer_with_transactional_message
ExampleProducerWithTransactionalMessage.cpp)
add_example(example_push_consumer ExamplePushConsumer.cpp)
add_example(example_simple_consumer ExampleSimpleConsumer.cpp)
\ No newline at end of file
diff --git a/cpp/examples/ExampleProducerWithPriorityMessage.cpp
b/cpp/examples/ExampleProducerWithPriorityMessage.cpp
new file mode 100644
index 00000000..2da14db8
--- /dev/null
+++ b/cpp/examples/ExampleProducerWithPriorityMessage.cpp
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <iostream>
+#include <thread>
+
+#include "rocketmq/Producer.h"
+#include "rocketmq/Message.h"
+
+using namespace rocketmq;
+
+int main() {
+ // Create producer configuration with topic
+ auto producer = Producer::newBuilder()
+ .withConfiguration(Configuration::newBuilder()
+ .withEndpoints("127.0.0.1:9876")
+ .build())
+ .withTopics({"PriorityMessageTopic"})
+ .build();
+
+ // Send priority messages with different priority levels
+ for (int i = 0; i < 5; ++i) {
+ // Create a priority message
+ auto message = Message::newBuilder()
+ .withTopic("PriorityMessageTopic")
+ .withTag("PriorityTag")
+ .withKeys({"key1", "key2"})
+ .withBody("This is a priority message with level " +
std::to_string(i))
+ .withPriority(i) // Set priority level (higher value =
higher priority)
+ .build();
+
+ // Send the message
+ std::error_code ec;
+ auto send_receipt = producer.send(std::move(message), ec);
+ if (ec) {
+ std::cerr << "Failed to send message: " << ec.message() << std::endl;
+ } else {
+ std::cout << "Sent priority message with level " << i
+ << ", message ID: " << send_receipt.message_id << std::endl;
+ }
+
+ // Small delay between sends
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+
+ std::cout << "Producer finished sending messages" << std::endl;
+ return 0;
+}
diff --git a/cpp/include/rocketmq/Message.h b/cpp/include/rocketmq/Message.h
index 3a917b1f..01bc58ed 100644
--- a/cpp/include/rocketmq/Message.h
+++ b/cpp/include/rocketmq/Message.h
@@ -100,6 +100,14 @@ public:
return group_;
}
+ std::int32_t priority() const {
+ return priority_;
+ }
+
+ void priority(std::int32_t priority) {
+ priority_ = priority;
+ }
+
const Extension& extension() const {
return extension_;
}
@@ -127,6 +135,7 @@ private:
std::string body_;
std::unordered_map<std::string, std::string> properties_;
std::string group_;
+ std::int32_t priority_{-1};
Extension extension_;
};
@@ -151,6 +160,8 @@ public:
MessageBuilder& withGroup(std::string group);
+ MessageBuilder& withPriority(std::int32_t priority);
+
MessageBuilder& withProperties(std::unordered_map<std::string, std::string>
properties);
/**
diff --git a/cpp/proto/apache/rocketmq/v2/definition.proto
b/cpp/proto/apache/rocketmq/v2/definition.proto
index 468c4105..c0762b0c 100644
--- a/cpp/proto/apache/rocketmq/v2/definition.proto
+++ b/cpp/proto/apache/rocketmq/v2/definition.proto
@@ -146,6 +146,12 @@ enum MessageType {
// Messages that are transactional. Only committed messages are delivered to
// subscribers.
TRANSACTION = 4;
+
+ // lite topic
+ LITE = 5;
+
+ // Messages that lower prioritised ones may need to wait for higher priority
messages to be processed first
+ PRIORITY = 6;
}
enum DigestType {
@@ -270,6 +276,12 @@ message SystemProperties {
// Information to identify whether this message is from dead letter queue.
optional DeadLetterQueue dead_letter_queue = 20;
+
+ // lite topic
+ optional string lite_topic = 21;
+
+ // Priority of message, which is optional
+ optional int32 priority = 22;
}
message DeadLetterQueue {
diff --git a/cpp/source/base/Message.cpp b/cpp/source/base/Message.cpp
index 8d82bd3f..1fa18136 100644
--- a/cpp/source/base/Message.cpp
+++ b/cpp/source/base/Message.cpp
@@ -64,6 +64,11 @@ MessageBuilder& MessageBuilder::withGroup(std::string group)
{
return *this;
}
+MessageBuilder& MessageBuilder::withPriority(std::int32_t priority) {
+ message_->priority_ = priority;
+ return *this;
+}
+
MessageBuilder& MessageBuilder::withProperties(std::unordered_map<std::string,
std::string> properties) {
message_->properties_ = std::move(properties);
return *this;
diff --git a/cpp/source/client/ClientManagerImpl.cpp
b/cpp/source/client/ClientManagerImpl.cpp
index 9c6b7a0e..9f6d268e 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -813,6 +813,11 @@ MessageConstSharedPtr ClientManagerImpl::wrapMessage(const
rmq::Message& item) {
builder.withGroup(system_properties.message_group());
}
+ // Priority
+ if (system_properties.has_priority()) {
+ builder.withPriority(system_properties.priority());
+ }
+
// Message-Id
const auto& message_id = system_properties.message_id();
builder.withId(message_id);
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp
b/cpp/source/rocketmq/ProducerImpl.cpp
index b7dd82e8..802c151c 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -155,10 +155,14 @@ void ProducerImpl::wrapSendMessageRequest(const Message&
message, SendMessageReq
system_properties->set_born_host(UtilAll::hostname());
+ // Determine message type based on properties
if (message.deliveryTimestamp().time_since_epoch().count()) {
system_properties->set_message_type(rmq::MessageType::DELAY);
} else if (!message.group().empty()) {
system_properties->set_message_type(rmq::MessageType::FIFO);
+ } else if (message.priority() >= 0) {
+ system_properties->set_message_type(rmq::MessageType::PRIORITY);
+ system_properties->set_priority(message.priority());
} else if (message.extension().transactional) {
system_properties->set_message_type(rmq::MessageType::TRANSACTION);
} else {
diff --git a/cpp/source/rocketmq/tests/BUILD.bazel
b/cpp/source/rocketmq/tests/BUILD.bazel
index 74751c35..1c9e2aa2 100644
--- a/cpp/source/rocketmq/tests/BUILD.bazel
+++ b/cpp/source/rocketmq/tests/BUILD.bazel
@@ -67,4 +67,12 @@ cc_test(
"OptionalTest.cpp",
],
deps = base_deps
+)
+
+cc_test(
+ name = "priority_message_test",
+ srcs = [
+ "PriorityMessageTest.cpp",
+ ],
+ deps = base_deps
)
\ No newline at end of file
diff --git a/cpp/source/rocketmq/tests/PriorityMessageTest.cpp
b/cpp/source/rocketmq/tests/PriorityMessageTest.cpp
new file mode 100644
index 00000000..5f57001e
--- /dev/null
+++ b/cpp/source/rocketmq/tests/PriorityMessageTest.cpp
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+
+#include "rocketmq/Message.h"
+#include "gtest/gtest.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+TEST(PriorityMessageTest, testPriorityMessageBuilder) {
+ // Test creating a priority message with builder
+ auto message = Message::newBuilder()
+ .withTopic("PriorityTopic")
+ .withBody("Test priority message")
+ .withTag("PriorityTag")
+ .withKeys({"key1", "key2"})
+ .withPriority(5)
+ .build();
+
+ ASSERT_NE(message, nullptr);
+ EXPECT_EQ(message->topic(), "PriorityTopic");
+ EXPECT_EQ(message->body(), "Test priority message");
+ EXPECT_EQ(message->tag(), "PriorityTag");
+ EXPECT_EQ(message->priority(), 5);
+
+ auto& keys = message->keys();
+ ASSERT_EQ(keys.size(), 2);
+ EXPECT_EQ(keys[0], "key1");
+ EXPECT_EQ(keys[1], "key2");
+}
+
+TEST(PriorityMessageTest, testDefaultPriority) {
+ // Test that default priority is -1 (not set)
+ auto message = Message::newBuilder()
+ .withTopic("NormalTopic")
+ .withBody("Normal message")
+ .build();
+
+ ASSERT_NE(message, nullptr);
+ EXPECT_EQ(message->priority(), -1);
+}
+
+TEST(PriorityMessageTest, testDifferentPriorityLevels) {
+ // Test different priority levels
+ for (int i = 0; i <= 10; ++i) {
+ auto message = Message::newBuilder()
+ .withTopic("PriorityTopic")
+ .withBody("Message with priority " + std::to_string(i))
+ .withPriority(i)
+ .build();
+
+ ASSERT_NE(message, nullptr);
+ EXPECT_EQ(message->priority(), i);
+ }
+}
+
+TEST(PriorityMessageTest, testPriorityMessageWithProperties) {
+ // Test priority message with custom properties
+ std::unordered_map<std::string, std::string> properties;
+ properties["custom_key"] = "custom_value";
+ properties["another_key"] = "another_value";
+
+ auto message = Message::newBuilder()
+ .withTopic("PriorityTopic")
+ .withBody("Priority message with properties")
+ .withPriority(8)
+ .withProperties(properties)
+ .build();
+
+ ASSERT_NE(message, nullptr);
+ EXPECT_EQ(message->priority(), 8);
+
+ auto& msg_properties = message->properties();
+ EXPECT_EQ(msg_properties.size(), 2);
+ EXPECT_EQ(msg_properties.at("custom_key"), "custom_value");
+ EXPECT_EQ(msg_properties.at("another_key"), "another_value");
+}
+
+ROCKETMQ_NAMESPACE_END