This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 0b838e73 Release Python SDK 5.1.0 (#1167)
0b838e73 is described below
commit 0b838e7382e066a37df7a4b68e701fe182ec435e
Author: zhouli11 <[email protected]>
AuthorDate: Mon Jan 5 17:36:36 2026 +0800
Release Python SDK 5.1.0 (#1167)
---
README-CN.md | 8 +-
README.md | 8 +-
...er_example.py => lite_push_consumer_example.py} | 34 +++--
python/example/push_consumer_example.py | 16 +--
python/rocketmq/__init__.py | 5 +-
python/rocketmq/grpc_protocol/__init__.py | 7 +-
python/rocketmq/grpc_protocol/definition_pb2.py | 94 ++++++------
.../rocketmq/grpc_protocol/proto/definition.proto | 29 ++++
python/rocketmq/grpc_protocol/proto/service.proto | 28 ++++
python/rocketmq/grpc_protocol/service_pb2.py | 135 ++++++++---------
python/rocketmq/grpc_protocol/service_pb2_grpc.py | 34 +++++
python/rocketmq/v5/client/client.py | 28 ++--
.../rocketmq/v5/client/connection/rpc_channel.py | 19 ++-
python/rocketmq/v5/client/connection/rpc_client.py | 11 +-
python/rocketmq/v5/consumer/__init__.py | 7 +-
python/rocketmq/v5/consumer/consumer.py | 21 +--
python/rocketmq/v5/consumer/{ => push}/__init__.py | 4 +-
.../rocketmq/v5/consumer/{ => push}/consumption.py | 3 +-
.../v5/consumer/{ => push}/fifo_consumption.py | 11 +-
.../v5/consumer/push/lite_push_consumer.py | 160 +++++++++++++++++++++
.../v5/consumer/{ => push}/message_listener.py | 0
.../v5/consumer/{ => push}/push_consumer.py | 84 +++++++----
.../rocketmq/v5/consumer/{ => simple}/__init__.py | 5 -
.../v5/consumer/{ => simple}/simple_consumer.py | 18 +++
python/rocketmq/v5/exception/__init__.py | 8 +-
python/rocketmq/v5/exception/client_exception.py | 54 +++----
python/rocketmq/v5/model/message.py | 87 ++++++-----
python/rocketmq/v5/model/process_queue.py | 2 +-
python/rocketmq/v5/model/retry_policy.py | 60 +++++++-
python/rocketmq/v5/producer/producer.py | 67 ++++++---
.../rocketmq/v5/util/messaging_result_checker.py | 16 ++-
python/rocketmq/v5/util/misc.py | 2 +-
32 files changed, 736 insertions(+), 329 deletions(-)
diff --git a/README-CN.md b/README-CN.md
index 88fb3023..781a3b62 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -23,11 +23,11 @@
| Producer with FIFO messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Producer with transactional messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
-| Producer with recalling timed/delay messages | ✅ | ✅ | 🚧 | 🚧
| 🚧 | 🚧 | 🚧 | 🚧 |
+| Producer with recalling timed/delay messages | ✅ | ✅ | 🚧 | 🚧
| 🚧 | ✅ | 🚧 | 🚧 |
| Simple consumer | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
-| Push consumer with concurrent message listener | ✅ | ✅ | ✅ | 🚧
| ✅ | 🚧 | 🚧 | 🚧 |
-| Push consumer with FIFO message listener | ✅ | ✅ | ✅ | 🚧
| ✅ | 🚧 | 🚧 | 🚧 |
-| Push consumer with FIFO consume accelerator | ✅ | ✅ | 🚧 | 🚧
| 🚧 | 🚧 | 🚧 | 🚧 |
+| Push consumer with concurrent message listener | ✅ | ✅ | ✅ | 🚧
| ✅ | ✅ | 🚧 | 🚧 |
+| Push consumer with FIFO message listener | ✅ | ✅ | ✅ | 🚧
| ✅ | ✅ | 🚧 | 🚧 |
+| Push consumer with FIFO consume accelerator | ✅ | ✅ | 🚧 | 🚧
| 🚧 | ✅ | 🚧 | 🚧 |
| Priority Message | ✅ | 🚧 | 🚧 | 🚧
| 🚧 | 🚧 | 🚧 | 🚧 |
## 先决条件和构建
diff --git a/README.md b/README.md
index 8f985f2d..ed166f46 100644
--- a/README.md
+++ b/README.md
@@ -23,11 +23,11 @@ Provide cloud-native and robust solutions for Java, C++,
C#, Golang, Rust and al
| Producer with FIFO messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Producer with transactional messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
-| Producer with recalling timed/delay messages | ✅ | ✅ | 🚧 | 🚧
| 🚧 | 🚧 | 🚧 | 🚧 |
+| Producer with recalling timed/delay messages | ✅ | ✅ | 🚧 | 🚧
| 🚧 | ✅ | 🚧 | 🚧 |
| Simple consumer | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
-| Push consumer with concurrent message listener | ✅ | ✅ | ✅ | 🚧
| ✅ | 🚧 | 🚧 | 🚧 |
-| Push consumer with FIFO message listener | ✅ | ✅ | ✅ | 🚧
| ✅ | 🚧 | 🚧 | 🚧 |
-| Push consumer with FIFO consume accelerator | ✅ | ✅ | 🚧 | 🚧
| 🚧 | 🚧 | 🚧 | 🚧 |
+| Push consumer with concurrent message listener | ✅ | ✅ | ✅ | 🚧
| ✅ | ✅ | 🚧 | 🚧 |
+| Push consumer with FIFO message listener | ✅ | ✅ | ✅ | 🚧
| ✅ | ✅ | 🚧 | 🚧 |
+| Push consumer with FIFO consume accelerator | ✅ | ✅ | 🚧 | 🚧
| 🚧 | ✅ | 🚧 | 🚧 |
| Priority Message | ✅ | 🚧 | 🚧 | 🚧
| 🚧 | 🚧 | 🚧 | 🚧 |
## Prerequisite and Build
diff --git a/python/example/push_consumer_example.py
b/python/example/lite_push_consumer_example.py
similarity index 57%
copy from python/example/push_consumer_example.py
copy to python/example/lite_push_consumer_example.py
index c35b8f23..f398e474 100644
--- a/python/example/push_consumer_example.py
+++ b/python/example/lite_push_consumer_example.py
@@ -13,17 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from rocketmq import (ClientConfiguration, Credentials, FilterExpression,
- Message)
-from rocketmq.v5.consumer import PushConsumer
-from rocketmq.v5.consumer.message_listener import (ConsumeResult,
- MessageListener)
+from rocketmq import (ClientConfiguration, ConsumeResult, Credentials,
+ LitePushConsumer, Message, MessageListener)
-class TestMessageListener(MessageListener):
+class LiteTopicTestMessageListener(MessageListener):
def consume(self, message: Message) -> ConsumeResult:
- print(f"consume message, topic:{message.topic}, message_id:
{message.message_id}.")
+ print(f"consume message, topic:{message.topic},
lite_topic:{message.lite_topic}, message_id: {message.message_id}.")
return ConsumeResult.SUCCESS
@@ -36,21 +33,22 @@ if __name__ == '__main__':
config = ClientConfiguration(endpoints, credentials)
# with namespace
# config = ClientConfiguration(endpoints, credentials, "namespace")
- topic = "topic"
+ bind_topic = "topic"
consumer_group = "consumer-group"
- # in most case, you don't need to create too many consumers, singleton
pattern is recommended
- # close the simple consumer when you don't need it anymore
- push_consumer = PushConsumer(config, consumer_group,
TestMessageListener(), {topic: FilterExpression()})
+ # A LitePushConsumer can only be bound to one topic
+ lite_push_consumer = LitePushConsumer(config, consumer_group, bind_topic,
LiteTopicTestMessageListener())
try:
- push_consumer.startup()
+ lite_push_consumer.startup()
+ for i in range(0, 10):
+ lite_push_consumer.subscribe_lite("lite-test-" + str(i))
try:
input("Please Enter to Stop the Application.\r\n")
except Exception as e:
- print(f"{push_consumer} raise exception: {e}")
- push_consumer.shutdown()
- print(f"{push_consumer} shutdown.")
+ print(f"{lite_push_consumer} raise exception: {e}")
+ lite_push_consumer.shutdown()
+ print(f"{lite_push_consumer} shutdown.")
except Exception as e:
- print(f"{push_consumer} startup raise exception: {e}")
- push_consumer.shutdown()
- print(f"{push_consumer} shutdown.")
+ print(f"{lite_push_consumer} startup raise exception: {e}")
+ lite_push_consumer.shutdown()
+ print(f"{lite_push_consumer} shutdown.")
diff --git a/python/example/push_consumer_example.py
b/python/example/push_consumer_example.py
index c35b8f23..0baa5099 100644
--- a/python/example/push_consumer_example.py
+++ b/python/example/push_consumer_example.py
@@ -13,11 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from rocketmq import (ClientConfiguration, Credentials, FilterExpression,
- Message)
-from rocketmq.v5.consumer import PushConsumer
-from rocketmq.v5.consumer.message_listener import (ConsumeResult,
- MessageListener)
+from rocketmq import (ClientConfiguration, ConsumeResult, Credentials,
+ FilterExpression, Message, MessageListener, PushConsumer)
class TestMessageListener(MessageListener):
@@ -37,10 +34,10 @@ if __name__ == '__main__':
# with namespace
# config = ClientConfiguration(endpoints, credentials, "namespace")
topic = "topic"
- consumer_group = "consumer-group"
+ consumer_group = "consumer_group"
# in most case, you don't need to create too many consumers, singleton
pattern is recommended
- # close the simple consumer when you don't need it anymore
- push_consumer = PushConsumer(config, consumer_group,
TestMessageListener(), {topic: FilterExpression()})
+ # close the push consumer when you don't need it anymore
+ push_consumer = PushConsumer(config, consumer_group,
TestMessageListener(), {topic: FilterExpression(), })
try:
push_consumer.startup()
@@ -52,5 +49,6 @@ if __name__ == '__main__':
print(f"{push_consumer} shutdown.")
except Exception as e:
print(f"{push_consumer} startup raise exception: {e}")
- push_consumer.shutdown()
+ if push_consumer.is_running:
+ push_consumer.shutdown()
print(f"{push_consumer} shutdown.")
diff --git a/python/rocketmq/__init__.py b/python/rocketmq/__init__.py
index 420795b9..d5509716 100644
--- a/python/rocketmq/__init__.py
+++ b/python/rocketmq/__init__.py
@@ -16,8 +16,8 @@
from rocketmq.grpc_protocol import TransactionResolution
from .v5.client import ClientConfiguration, Credentials
-from .v5.consumer import (ConsumeResult, MessageListener, PushConsumer,
- SimpleConsumer)
+from .v5.consumer import (ConsumeResult, LitePushConsumer, MessageListener,
+ PushConsumer, SimpleConsumer)
from .v5.model import FilterExpression, Message, SendReceipt
from .v5.producer import Producer, Transaction, TransactionChecker
@@ -28,6 +28,7 @@ __all__ = [
"TransactionResolution", # noqa
"SimpleConsumer",
"PushConsumer",
+ "LitePushConsumer",
"MessageListener",
"ConsumeResult",
"Message",
diff --git a/python/rocketmq/grpc_protocol/__init__.py
b/python/rocketmq/grpc_protocol/__init__.py
index 5f3ff16c..cd6a0557 100644
--- a/python/rocketmq/grpc_protocol/__init__.py
+++ b/python/rocketmq/grpc_protocol/__init__.py
@@ -15,7 +15,7 @@
from .definition_pb2 import (Address, AddressScheme, Broker, # noqa
ClientType, Code, DigestType, Encoding,
Endpoints, # noqa
- FilterType, Language, MessageType, Metric, # noqa
+ FilterType, Language, LiteSubscriptionAction,
MessageType, Metric, # noqa
Permission, Publishing, Resource, Settings, # noqa
Status, Subscription, TransactionResolution, #
noqa
TransactionSource) # noqa
@@ -24,7 +24,8 @@ from .service_pb2 import (AckMessageEntry, AckMessageRequest,
# noqa
EndTransactionRequest, HeartbeatRequest, # noqa
NotifyClientTerminationRequest, QueryRouteRequest, #
noqa
ReceiveMessageRequest, SendMessageRequest, # noqa
- TelemetryCommand, RecallMessageRequest,
QueryAssignmentRequest, # noqa
+ SyncLiteSubscriptionRequest, TelemetryCommand, # noqa
+ RecallMessageRequest, QueryAssignmentRequest, # noqa
ForwardMessageToDeadLetterQueueRequest) # noqa
from .service_pb2_grpc import MessagingServiceStub
@@ -61,6 +62,8 @@ __all__ = [
"TelemetryCommand", # noqa
"RecallMessageRequest", # noqa
"QueryAssignmentRequest", # noqa
+ "SyncLiteSubscriptionRequest", # noqa
+ "LiteSubscriptionAction", # noqa
"ForwardMessageToDeadLetterQueueRequest", # noqa
"MessagingServiceStub",
]
diff --git a/python/rocketmq/grpc_protocol/definition_pb2.py
b/python/rocketmq/grpc_protocol/definition_pb2.py
index 9d85c8f1..924979a7 100644
--- a/python/rocketmq/grpc_protocol/definition_pb2.py
+++ b/python/rocketmq/grpc_protocol/definition_pb2.py
@@ -26,7 +26,7 @@ from google.protobuf import timestamp_pb2 as
google_dot_protobuf_dot_timestamp__
from google.protobuf import duration_pb2 as
google_dot_protobuf_dot_duration__pb2
-DESCRIPTOR =
_descriptor_pool.Default().AddSerializedFile(b'\n\x10\x64\x65\x66inition.proto\x12\x12\x61pache.rocketmq.v2\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/duration.proto\"T\n\x10\x46ilterExpression\x12,\n\x04type\x18\x01
\x01(\x0e\x32\x1e.apache.rocketmq.v2.FilterType\x12\x12\n\nexpression\x18\x02
\x01(\t\"\xbb\x01\n\x0bRetryPolicy\x12\x14\n\x0cmax_attempts\x18\x01
\x01(\x05\x12\x45\n\x13\x65xponential_backoff\x18\x02
\x01(\x0b\x32&.apache.rocketmq.v2.Exponent [...]
+DESCRIPTOR =
_descriptor_pool.Default().AddSerializedFile(b'\n\x10\x64\x65\x66inition.proto\x12\x12\x61pache.rocketmq.v2\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/duration.proto\"T\n\x10\x46ilterExpression\x12,\n\x04type\x18\x01
\x01(\x0e\x32\x1e.apache.rocketmq.v2.FilterType\x12\x12\n\nexpression\x18\x02
\x01(\t\"\xbb\x01\n\x0bRetryPolicy\x12\x14\n\x0cmax_attempts\x18\x01
\x01(\x05\x12\x45\n\x13\x65xponential_backoff\x18\x02
\x01(\x0b\x32&.apache.rocketmq.v2.Exponent [...]
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -36,30 +36,32 @@ if not _descriptor._USE_C_DESCRIPTORS:
_globals['DESCRIPTOR']._serialized_options =
b'\n\022apache.rocketmq.v2B\010MQDomainP\001\240\001\001\330\001\001\252\002\022Apache.Rocketmq.V2'
_globals['_MESSAGE_USERPROPERTIESENTRY']._loaded_options = None
_globals['_MESSAGE_USERPROPERTIESENTRY']._serialized_options = b'8\001'
- _globals['_TRANSACTIONRESOLUTION']._serialized_start=3943
- _globals['_TRANSACTIONRESOLUTION']._serialized_end=4032
- _globals['_TRANSACTIONSOURCE']._serialized_start=4034
- _globals['_TRANSACTIONSOURCE']._serialized_end=4121
- _globals['_PERMISSION']._serialized_start=4123
- _globals['_PERMISSION']._serialized_end=4210
- _globals['_FILTERTYPE']._serialized_start=4212
- _globals['_FILTERTYPE']._serialized_end=4271
- _globals['_ADDRESSSCHEME']._serialized_start=4273
- _globals['_ADDRESSSCHEME']._serialized_end=4357
- _globals['_MESSAGETYPE']._serialized_start=4359
- _globals['_MESSAGETYPE']._serialized_end=4452
- _globals['_DIGESTTYPE']._serialized_start=4454
- _globals['_DIGESTTYPE']._serialized_end=4525
- _globals['_CLIENTTYPE']._serialized_start=4527
- _globals['_CLIENTTYPE']._serialized_end=4641
- _globals['_ENCODING']._serialized_start=4643
- _globals['_ENCODING']._serialized_end=4703
- _globals['_CODE']._serialized_start=4706
- _globals['_CODE']._serialized_end=6056
- _globals['_LANGUAGE']._serialized_start=6059
- _globals['_LANGUAGE']._serialized_end=6232
- _globals['_QUERYOFFSETPOLICY']._serialized_start=6234
- _globals['_QUERYOFFSETPOLICY']._serialized_end=6292
+ _globals['_TRANSACTIONRESOLUTION']._serialized_start=4107
+ _globals['_TRANSACTIONRESOLUTION']._serialized_end=4196
+ _globals['_TRANSACTIONSOURCE']._serialized_start=4198
+ _globals['_TRANSACTIONSOURCE']._serialized_end=4285
+ _globals['_PERMISSION']._serialized_start=4287
+ _globals['_PERMISSION']._serialized_end=4374
+ _globals['_FILTERTYPE']._serialized_start=4376
+ _globals['_FILTERTYPE']._serialized_end=4435
+ _globals['_ADDRESSSCHEME']._serialized_start=4437
+ _globals['_ADDRESSSCHEME']._serialized_end=4521
+ _globals['_MESSAGETYPE']._serialized_start=4523
+ _globals['_MESSAGETYPE']._serialized_end=4626
+ _globals['_DIGESTTYPE']._serialized_start=4628
+ _globals['_DIGESTTYPE']._serialized_end=4699
+ _globals['_CLIENTTYPE']._serialized_start=4702
+ _globals['_CLIENTTYPE']._serialized_end=4866
+ _globals['_ENCODING']._serialized_start=4868
+ _globals['_ENCODING']._serialized_end=4928
+ _globals['_CODE']._serialized_start=4931
+ _globals['_CODE']._serialized_end=6380
+ _globals['_LANGUAGE']._serialized_start=6383
+ _globals['_LANGUAGE']._serialized_end=6556
+ _globals['_LITESUBSCRIPTIONACTION']._serialized_start=6558
+ _globals['_LITESUBSCRIPTIONACTION']._serialized_end=6658
+ _globals['_QUERYOFFSETPOLICY']._serialized_start=6660
+ _globals['_QUERYOFFSETPOLICY']._serialized_end=6718
_globals['_FILTEREXPRESSION']._serialized_start=105
_globals['_FILTEREXPRESSION']._serialized_end=189
_globals['_RETRYPOLICY']._serialized_start=192
@@ -83,25 +85,25 @@ if not _descriptor._USE_C_DESCRIPTORS:
_globals['_DIGEST']._serialized_start=1217
_globals['_DIGEST']._serialized_end=1289
_globals['_SYSTEMPROPERTIES']._serialized_start=1292
- _globals['_SYSTEMPROPERTIES']._serialized_end=2331
- _globals['_DEADLETTERQUEUE']._serialized_start=2333
- _globals['_DEADLETTERQUEUE']._serialized_end=2385
- _globals['_MESSAGE']._serialized_start=2388
- _globals['_MESSAGE']._serialized_end=2650
- _globals['_MESSAGE_USERPROPERTIESENTRY']._serialized_start=2597
- _globals['_MESSAGE_USERPROPERTIESENTRY']._serialized_end=2650
- _globals['_ASSIGNMENT']._serialized_start=2652
- _globals['_ASSIGNMENT']._serialized_end=2721
- _globals['_STATUS']._serialized_start=2723
- _globals['_STATUS']._serialized_end=2788
- _globals['_UA']._serialized_start=2790
- _globals['_UA']._serialized_end=2895
- _globals['_SETTINGS']._serialized_start=2898
- _globals['_SETTINGS']._serialized_end=3426
- _globals['_PUBLISHING']._serialized_start=3428
- _globals['_PUBLISHING']._serialized_end=3540
- _globals['_SUBSCRIPTION']._serialized_start=3543
- _globals['_SUBSCRIPTION']._serialized_end=3850
- _globals['_METRIC']._serialized_start=3852
- _globals['_METRIC']._serialized_end=3941
+ _globals['_SYSTEMPROPERTIES']._serialized_end=2371
+ _globals['_DEADLETTERQUEUE']._serialized_start=2373
+ _globals['_DEADLETTERQUEUE']._serialized_end=2425
+ _globals['_MESSAGE']._serialized_start=2428
+ _globals['_MESSAGE']._serialized_end=2690
+ _globals['_MESSAGE_USERPROPERTIESENTRY']._serialized_start=2637
+ _globals['_MESSAGE_USERPROPERTIESENTRY']._serialized_end=2690
+ _globals['_ASSIGNMENT']._serialized_start=2692
+ _globals['_ASSIGNMENT']._serialized_end=2761
+ _globals['_STATUS']._serialized_start=2763
+ _globals['_STATUS']._serialized_end=2828
+ _globals['_UA']._serialized_start=2830
+ _globals['_UA']._serialized_end=2935
+ _globals['_SETTINGS']._serialized_start=2938
+ _globals['_SETTINGS']._serialized_end=3466
+ _globals['_PUBLISHING']._serialized_start=3468
+ _globals['_PUBLISHING']._serialized_end=3580
+ _globals['_SUBSCRIPTION']._serialized_start=3583
+ _globals['_SUBSCRIPTION']._serialized_end=4014
+ _globals['_METRIC']._serialized_start=4016
+ _globals['_METRIC']._serialized_end=4105
# @@protoc_insertion_point(module_scope)
diff --git a/python/rocketmq/grpc_protocol/proto/definition.proto
b/python/rocketmq/grpc_protocol/proto/definition.proto
index 468c4105..2513bac0 100644
--- a/python/rocketmq/grpc_protocol/proto/definition.proto
+++ b/python/rocketmq/grpc_protocol/proto/definition.proto
@@ -146,6 +146,9 @@ enum MessageType {
// Messages that are transactional. Only committed messages are delivered to
// subscribers.
TRANSACTION = 4;
+
+ // lite topic
+ LITE = 5;
}
enum DigestType {
@@ -186,6 +189,8 @@ enum ClientType {
PUSH_CONSUMER = 2;
SIMPLE_CONSUMER = 3;
PULL_CONSUMER = 4;
+ LITE_PUSH_CONSUMER = 5;
+ LITE_SIMPLE_CONSUMER = 6;
}
enum Encoding {
@@ -270,6 +275,9 @@ message SystemProperties {
// Information to identify whether this message is from dead letter queue.
optional DeadLetterQueue dead_letter_queue = 20;
+
+ // lite topic
+ optional string lite_topic = 21;
}
message DeadLetterQueue {
@@ -348,6 +356,8 @@ enum Code {
ILLEGAL_POLLING_TIME = 40018;
// Offset is illegal.
ILLEGAL_OFFSET = 40019;
+ // Format of lite topic is illegal.
+ ILLEGAL_LITE_TOPIC = 40020;
// Generic code indicates that the client request lacks valid authentication
// credentials for the requested resource.
@@ -389,6 +399,10 @@ enum Code {
// Requests are throttled.
TOO_MANY_REQUESTS = 42900;
+ // LiteTopic related quota exceeded
+ LITE_TOPIC_QUOTA_EXCEEDED = 42901;
+ LITE_SUBSCRIPTION_QUOTA_EXCEEDED = 42902;
+
// Generic code for the case that the server is unwilling to process the
request because its header fields are too large.
// The request may be resubmitted after reducing the size of the request
header fields.
REQUEST_HEADER_FIELDS_TOO_LARGE = 43100;
@@ -548,6 +562,21 @@ message Subscription {
// Long-polling timeout for `ReceiveMessageRequest`, which is essential for
// push consumer.
optional google.protobuf.Duration long_polling_timeout = 5;
+
+ // Only lite push consumer
+ // client-side lite subscription quota limit
+ optional int32 lite_subscription_quota = 6;
+
+ // Only lite push consumer
+ // Maximum length limit for lite topic
+ optional int32 max_lite_topic_size = 7;
+}
+
+enum LiteSubscriptionAction {
+ PARTIAL_ADD = 0;
+ PARTIAL_REMOVE = 1;
+ COMPLETE_ADD = 2;
+ COMPLETE_REMOVE = 3;
}
message Metric {
diff --git a/python/rocketmq/grpc_protocol/proto/service.proto
b/python/rocketmq/grpc_protocol/proto/service.proto
index e5497f2d..ca809ff2 100644
--- a/python/rocketmq/grpc_protocol/proto/service.proto
+++ b/python/rocketmq/grpc_protocol/proto/service.proto
@@ -114,6 +114,7 @@ message ReceiveMessageResponse {
message AckMessageEntry {
string message_id = 1;
string receipt_handle = 2;
+ optional string lite_topic = 3;
}
message AckMessageRequest {
@@ -148,6 +149,7 @@ message ForwardMessageToDeadLetterQueueRequest {
string message_id = 4;
int32 delivery_attempt = 5;
int32 max_delivery_attempts = 6;
+ optional string lite_topic = 7;
}
message ForwardMessageToDeadLetterQueueResponse { Status status = 1; }
@@ -193,6 +195,10 @@ message RecoverOrphanedTransactionCommand {
string transaction_id = 2;
}
+message NotifyUnsubscribeLiteCommand {
+ string lite_topic = 1;
+}
+
message TelemetryCommand {
optional Status status = 1;
@@ -221,6 +227,9 @@ message TelemetryCommand {
// Request client to reconnect server use the latest endpoints.
ReconnectEndpointsCommand reconnect_endpoints_command = 8;
+
+ // Request client to unsubscribe lite topic.
+ NotifyUnsubscribeLiteCommand notify_unsubscribe_lite_command = 9;
}
}
@@ -311,6 +320,21 @@ message RecallMessageResponse {
string message_id = 2;
}
+message SyncLiteSubscriptionRequest {
+ LiteSubscriptionAction action = 1;
+ // bindTopic for lite push consumer
+ Resource topic = 2;
+ // consumer group
+ Resource group = 3;
+ // lite subscription set of lite topics
+ repeated string lite_topic_set = 4;
+ optional int64 version = 5;
+}
+
+message SyncLiteSubscriptionResponse {
+ Status status = 1;
+}
+
// For all the RPCs in MessagingService, the following error handling policies
// apply:
//
@@ -440,4 +464,8 @@ service MessagingService {
// for normal message, not supported for now.
rpc RecallMessage(RecallMessageRequest) returns (RecallMessageResponse) {
}
+
+ // Sync lite subscription info, lite push consumer only
+ rpc SyncLiteSubscription(SyncLiteSubscriptionRequest) returns
(SyncLiteSubscriptionResponse) {}
+
}
diff --git a/python/rocketmq/grpc_protocol/service_pb2.py
b/python/rocketmq/grpc_protocol/service_pb2.py
index bf080be2..718fae85 100644
--- a/python/rocketmq/grpc_protocol/service_pb2.py
+++ b/python/rocketmq/grpc_protocol/service_pb2.py
@@ -26,8 +26,7 @@ from google.protobuf import duration_pb2 as
google_dot_protobuf_dot_duration__pb
from google.protobuf import timestamp_pb2 as
google_dot_protobuf_dot_timestamp__pb2
from rocketmq.grpc_protocol import definition_pb2 as definition__pb2
-
-DESCRIPTOR =
_descriptor_pool.Default().AddSerializedFile(b'\n\rservice.proto\x12\x12\x61pache.rocketmq.v2\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x10\x64\x65\x66inition.proto\"r\n\x11QueryRouteRequest\x12+\n\x05topic\x18\x01
\x01(\x0b\x32\x1c.apache.rocketmq.v2.Resource\x12\x30\n\tendpoints\x18\x02
\x01(\x0b\x32\x1d.apache.rocketmq.v2.Endpoints\"z\n\x12QueryRouteResponse\x12*\n\x06status\x18\x01
\x01(\x0b\x32\x1a.apache.rocketmq.v2.Status\x12\x38 [...]
+DESCRIPTOR =
_descriptor_pool.Default().AddSerializedFile(b'\n\rservice.proto\x12\x12\x61pache.rocketmq.v2\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x10\x64\x65\x66inition.proto\"r\n\x11QueryRouteRequest\x12+\n\x05topic\x18\x01
\x01(\x0b\x32\x1c.apache.rocketmq.v2.Resource\x12\x30\n\tendpoints\x18\x02
\x01(\x0b\x32\x1d.apache.rocketmq.v2.Endpoints\"z\n\x12QueryRouteResponse\x12*\n\x06status\x18\x01
\x01(\x0b\x32\x1a.apache.rocketmq.v2.Status\x12\x38 [...]
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -54,67 +53,73 @@ if not _descriptor._USE_C_DESCRIPTORS:
_globals['_RECEIVEMESSAGERESPONSE']._serialized_start=1432
_globals['_RECEIVEMESSAGERESPONSE']._serialized_end=1619
_globals['_ACKMESSAGEENTRY']._serialized_start=1621
- _globals['_ACKMESSAGEENTRY']._serialized_end=1682
- _globals['_ACKMESSAGEREQUEST']._serialized_start=1685
- _globals['_ACKMESSAGEREQUEST']._serialized_end=1848
- _globals['_ACKMESSAGERESULTENTRY']._serialized_start=1850
- _globals['_ACKMESSAGERESULTENTRY']._serialized_end=1961
- _globals['_ACKMESSAGERESPONSE']._serialized_start=1963
- _globals['_ACKMESSAGERESPONSE']._serialized_end=2087
- _globals['_FORWARDMESSAGETODEADLETTERQUEUEREQUEST']._serialized_start=2090
- _globals['_FORWARDMESSAGETODEADLETTERQUEUEREQUEST']._serialized_end=2321
- _globals['_FORWARDMESSAGETODEADLETTERQUEUERESPONSE']._serialized_start=2323
- _globals['_FORWARDMESSAGETODEADLETTERQUEUERESPONSE']._serialized_end=2408
- _globals['_HEARTBEATREQUEST']._serialized_start=2411
- _globals['_HEARTBEATREQUEST']._serialized_end=2542
- _globals['_HEARTBEATRESPONSE']._serialized_start=2544
- _globals['_HEARTBEATRESPONSE']._serialized_end=2607
- _globals['_ENDTRANSACTIONREQUEST']._serialized_start=2610
- _globals['_ENDTRANSACTIONREQUEST']._serialized_end=2863
- _globals['_ENDTRANSACTIONRESPONSE']._serialized_start=2865
- _globals['_ENDTRANSACTIONRESPONSE']._serialized_end=2933
- _globals['_PRINTTHREADSTACKTRACECOMMAND']._serialized_start=2935
- _globals['_PRINTTHREADSTACKTRACECOMMAND']._serialized_end=2980
- _globals['_RECONNECTENDPOINTSCOMMAND']._serialized_start=2982
- _globals['_RECONNECTENDPOINTSCOMMAND']._serialized_end=3024
- _globals['_THREADSTACKTRACE']._serialized_start=3026
- _globals['_THREADSTACKTRACE']._serialized_end=3115
- _globals['_VERIFYMESSAGECOMMAND']._serialized_start=3117
- _globals['_VERIFYMESSAGECOMMAND']._serialized_end=3200
- _globals['_VERIFYMESSAGERESULT']._serialized_start=3202
- _globals['_VERIFYMESSAGERESULT']._serialized_end=3238
- _globals['_RECOVERORPHANEDTRANSACTIONCOMMAND']._serialized_start=3240
- _globals['_RECOVERORPHANEDTRANSACTIONCOMMAND']._serialized_end=3345
- _globals['_TELEMETRYCOMMAND']._serialized_start=3348
- _globals['_TELEMETRYCOMMAND']._serialized_end=3988
- _globals['_NOTIFYCLIENTTERMINATIONREQUEST']._serialized_start=3990
- _globals['_NOTIFYCLIENTTERMINATIONREQUEST']._serialized_end=4082
- _globals['_NOTIFYCLIENTTERMINATIONRESPONSE']._serialized_start=4084
- _globals['_NOTIFYCLIENTTERMINATIONRESPONSE']._serialized_end=4161
- _globals['_CHANGEINVISIBLEDURATIONREQUEST']._serialized_start=4164
- _globals['_CHANGEINVISIBLEDURATIONREQUEST']._serialized_end=4385
- _globals['_CHANGEINVISIBLEDURATIONRESPONSE']._serialized_start=4387
- _globals['_CHANGEINVISIBLEDURATIONRESPONSE']._serialized_end=4488
- _globals['_PULLMESSAGEREQUEST']._serialized_start=4491
- _globals['_PULLMESSAGEREQUEST']._serialized_end=4771
- _globals['_PULLMESSAGERESPONSE']._serialized_start=4774
- _globals['_PULLMESSAGERESPONSE']._serialized_end=4923
- _globals['_UPDATEOFFSETREQUEST']._serialized_start=4926
- _globals['_UPDATEOFFSETREQUEST']._serialized_end=5065
- _globals['_UPDATEOFFSETRESPONSE']._serialized_start=5067
- _globals['_UPDATEOFFSETRESPONSE']._serialized_end=5133
- _globals['_GETOFFSETREQUEST']._serialized_start=5135
- _globals['_GETOFFSETREQUEST']._serialized_end=5255
- _globals['_GETOFFSETRESPONSE']._serialized_start=5257
- _globals['_GETOFFSETRESPONSE']._serialized_end=5336
- _globals['_QUERYOFFSETREQUEST']._serialized_start=5339
- _globals['_QUERYOFFSETREQUEST']._serialized_end=5550
- _globals['_QUERYOFFSETRESPONSE']._serialized_start=5552
- _globals['_QUERYOFFSETRESPONSE']._serialized_end=5633
- _globals['_RECALLMESSAGEREQUEST']._serialized_start=5635
- _globals['_RECALLMESSAGEREQUEST']._serialized_end=5725
- _globals['_RECALLMESSAGERESPONSE']._serialized_start=5727
- _globals['_RECALLMESSAGERESPONSE']._serialized_end=5814
- _globals['_MESSAGINGSERVICE']._serialized_start=5817
- _globals['_MESSAGINGSERVICE']._serialized_end=7560
+ _globals['_ACKMESSAGEENTRY']._serialized_end=1722
+ _globals['_ACKMESSAGEREQUEST']._serialized_start=1725
+ _globals['_ACKMESSAGEREQUEST']._serialized_end=1888
+ _globals['_ACKMESSAGERESULTENTRY']._serialized_start=1890
+ _globals['_ACKMESSAGERESULTENTRY']._serialized_end=2001
+ _globals['_ACKMESSAGERESPONSE']._serialized_start=2003
+ _globals['_ACKMESSAGERESPONSE']._serialized_end=2127
+ _globals['_FORWARDMESSAGETODEADLETTERQUEUEREQUEST']._serialized_start=2130
+ _globals['_FORWARDMESSAGETODEADLETTERQUEUEREQUEST']._serialized_end=2401
+ _globals['_FORWARDMESSAGETODEADLETTERQUEUERESPONSE']._serialized_start=2403
+ _globals['_FORWARDMESSAGETODEADLETTERQUEUERESPONSE']._serialized_end=2488
+ _globals['_HEARTBEATREQUEST']._serialized_start=2491
+ _globals['_HEARTBEATREQUEST']._serialized_end=2622
+ _globals['_HEARTBEATRESPONSE']._serialized_start=2624
+ _globals['_HEARTBEATRESPONSE']._serialized_end=2687
+ _globals['_ENDTRANSACTIONREQUEST']._serialized_start=2690
+ _globals['_ENDTRANSACTIONREQUEST']._serialized_end=2943
+ _globals['_ENDTRANSACTIONRESPONSE']._serialized_start=2945
+ _globals['_ENDTRANSACTIONRESPONSE']._serialized_end=3013
+ _globals['_PRINTTHREADSTACKTRACECOMMAND']._serialized_start=3015
+ _globals['_PRINTTHREADSTACKTRACECOMMAND']._serialized_end=3060
+ _globals['_RECONNECTENDPOINTSCOMMAND']._serialized_start=3062
+ _globals['_RECONNECTENDPOINTSCOMMAND']._serialized_end=3104
+ _globals['_THREADSTACKTRACE']._serialized_start=3106
+ _globals['_THREADSTACKTRACE']._serialized_end=3195
+ _globals['_VERIFYMESSAGECOMMAND']._serialized_start=3197
+ _globals['_VERIFYMESSAGECOMMAND']._serialized_end=3280
+ _globals['_VERIFYMESSAGERESULT']._serialized_start=3282
+ _globals['_VERIFYMESSAGERESULT']._serialized_end=3318
+ _globals['_RECOVERORPHANEDTRANSACTIONCOMMAND']._serialized_start=3320
+ _globals['_RECOVERORPHANEDTRANSACTIONCOMMAND']._serialized_end=3425
+ _globals['_NOTIFYUNSUBSCRIBELITECOMMAND']._serialized_start=3427
+ _globals['_NOTIFYUNSUBSCRIBELITECOMMAND']._serialized_end=3477
+ _globals['_TELEMETRYCOMMAND']._serialized_start=3480
+ _globals['_TELEMETRYCOMMAND']._serialized_end=4213
+ _globals['_NOTIFYCLIENTTERMINATIONREQUEST']._serialized_start=4215
+ _globals['_NOTIFYCLIENTTERMINATIONREQUEST']._serialized_end=4307
+ _globals['_NOTIFYCLIENTTERMINATIONRESPONSE']._serialized_start=4309
+ _globals['_NOTIFYCLIENTTERMINATIONRESPONSE']._serialized_end=4386
+ _globals['_CHANGEINVISIBLEDURATIONREQUEST']._serialized_start=4389
+ _globals['_CHANGEINVISIBLEDURATIONREQUEST']._serialized_end=4610
+ _globals['_CHANGEINVISIBLEDURATIONRESPONSE']._serialized_start=4612
+ _globals['_CHANGEINVISIBLEDURATIONRESPONSE']._serialized_end=4713
+ _globals['_PULLMESSAGEREQUEST']._serialized_start=4716
+ _globals['_PULLMESSAGEREQUEST']._serialized_end=4996
+ _globals['_PULLMESSAGERESPONSE']._serialized_start=4999
+ _globals['_PULLMESSAGERESPONSE']._serialized_end=5148
+ _globals['_UPDATEOFFSETREQUEST']._serialized_start=5151
+ _globals['_UPDATEOFFSETREQUEST']._serialized_end=5290
+ _globals['_UPDATEOFFSETRESPONSE']._serialized_start=5292
+ _globals['_UPDATEOFFSETRESPONSE']._serialized_end=5358
+ _globals['_GETOFFSETREQUEST']._serialized_start=5360
+ _globals['_GETOFFSETREQUEST']._serialized_end=5480
+ _globals['_GETOFFSETRESPONSE']._serialized_start=5482
+ _globals['_GETOFFSETRESPONSE']._serialized_end=5561
+ _globals['_QUERYOFFSETREQUEST']._serialized_start=5564
+ _globals['_QUERYOFFSETREQUEST']._serialized_end=5775
+ _globals['_QUERYOFFSETRESPONSE']._serialized_start=5777
+ _globals['_QUERYOFFSETRESPONSE']._serialized_end=5858
+ _globals['_RECALLMESSAGEREQUEST']._serialized_start=5860
+ _globals['_RECALLMESSAGEREQUEST']._serialized_end=5950
+ _globals['_RECALLMESSAGERESPONSE']._serialized_start=5952
+ _globals['_RECALLMESSAGERESPONSE']._serialized_end=6039
+ _globals['_SYNCLITESUBSCRIPTIONREQUEST']._serialized_start=6042
+ _globals['_SYNCLITESUBSCRIPTIONREQUEST']._serialized_end=6279
+ _globals['_SYNCLITESUBSCRIPTIONRESPONSE']._serialized_start=6281
+ _globals['_SYNCLITESUBSCRIPTIONRESPONSE']._serialized_end=6355
+ _globals['_MESSAGINGSERVICE']._serialized_start=6358
+ _globals['_MESSAGINGSERVICE']._serialized_end=8226
# @@protoc_insertion_point(module_scope)
diff --git a/python/rocketmq/grpc_protocol/service_pb2_grpc.py
b/python/rocketmq/grpc_protocol/service_pb2_grpc.py
index b04a190d..091cad16 100644
--- a/python/rocketmq/grpc_protocol/service_pb2_grpc.py
+++ b/python/rocketmq/grpc_protocol/service_pb2_grpc.py
@@ -117,6 +117,11 @@ class MessagingServiceStub(object):
request_serializer=service__pb2.RecallMessageRequest.SerializeToString,
response_deserializer=service__pb2.RecallMessageResponse.FromString,
)
+ self.SyncLiteSubscription = channel.unary_unary(
+ '/apache.rocketmq.v2.MessagingService/SyncLiteSubscription',
+
request_serializer=service__pb2.SyncLiteSubscriptionRequest.SerializeToString,
+
response_deserializer=service__pb2.SyncLiteSubscriptionResponse.FromString,
+ )
class MessagingServiceServicer(object):
@@ -308,6 +313,13 @@ class MessagingServiceServicer(object):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
+ def SyncLiteSubscription(self, request, context):
+ """Sync lite subscription info, lite push consumer only
+ """
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
def add_MessagingServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
@@ -391,6 +403,11 @@ def add_MessagingServiceServicer_to_server(servicer,
server):
request_deserializer=service__pb2.RecallMessageRequest.FromString,
response_serializer=service__pb2.RecallMessageResponse.SerializeToString,
),
+ 'SyncLiteSubscription': grpc.unary_unary_rpc_method_handler(
+ servicer.SyncLiteSubscription,
+
request_deserializer=service__pb2.SyncLiteSubscriptionRequest.FromString,
+
response_serializer=service__pb2.SyncLiteSubscriptionResponse.SerializeToString,
+ ),
}
generic_handler = grpc.method_handlers_generic_handler(
'apache.rocketmq.v2.MessagingService', rpc_method_handlers)
@@ -682,3 +699,20 @@ class MessagingService(object):
service__pb2.RecallMessageResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout,
metadata)
+
+ @staticmethod
+ def SyncLiteSubscription(request,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None):
+ return grpc.experimental.unary_unary(request, target,
'/apache.rocketmq.v2.MessagingService/SyncLiteSubscription',
+ service__pb2.SyncLiteSubscriptionRequest.SerializeToString,
+ service__pb2.SyncLiteSubscriptionResponse.FromString,
+ options, channel_credentials,
+ insecure, call_credentials, compression, wait_for_ready, timeout,
metadata)
diff --git a/python/rocketmq/v5/client/client.py
b/python/rocketmq/v5/client/client.py
index 7ee51495..05f01625 100644
--- a/python/rocketmq/v5/client/client.py
+++ b/python/rocketmq/v5/client/client.py
@@ -60,6 +60,7 @@ class Client:
self.__client_callback_executor = None
self.__is_running = False
self.__had_shutdown = False
+ self._init_settings_event = threading.Event()
def startup(self):
try:
@@ -71,12 +72,15 @@ class Client:
self._pre_start()
try:
# pre update topic route for producer or consumer
- for topic in self.__topics:
- self.__update_topic_route(topic)
+ if self.__topics:
+ for topic in self.__topics:
+ if not self.__update_topic_route(topic):
+ raise Exception("update topic raise exception when
client startup")
+ self._init_settings_event.wait()
except Exception as e:
# ignore this exception and retrieve again when calling send
or receive
logger.warn(
- f"update topic exception when client startup, ignore it,
try it again in scheduler. exception: {e}"
+ f"update topic raise exception when client startup, ignore
it, try it again in scheduler. exception: {e}"
)
self.__start_scheduler()
self.__start_async_rpc_callback_executor()
@@ -104,6 +108,7 @@ class Client:
self.__rpc_client.stop()
self.__topic_route_cache.clear()
self.__topics.clear()
+ self._init_settings_event = None
self.__had_shutdown = True
self.__is_running = False
except Exception as e:
@@ -239,11 +244,13 @@ class Client:
def _retrieve_topic_route_data(self, topic):
route = self.__topic_route_cache.get(topic)
- if route is not None:
+ if route:
+ if topic not in self.__topics:
+ self.__topics.add(topic)
return route
else:
route = self.__update_topic_route(topic)
- if route is not None:
+ if route:
logger.info(f"{self} update topic:{topic} route success.")
self.__topics.add(topic)
return route
@@ -299,9 +306,9 @@ class Client:
res = future.result()
self.__handle_topic_route_res(res, topic)
except Exception as e:
- raise e
+ logger.error(f"query topic raise exception, {e}")
finally:
- if event is not None:
+ if event:
event.set()
def __topic_route_req(self, topic):
@@ -312,7 +319,7 @@ class Client:
return req
def __handle_topic_route_res(self, res, topic):
- if res is not None:
+ if res:
MessagingResultChecker.check(res.status)
if res.status.code == Code.OK:
topic_route = TopicRouteData(res.message_queues)
@@ -343,12 +350,12 @@ class Client:
def __heartbeat_callback(self, future, endpoints):
try:
res = future.result()
- if res is not None and res.status.code == Code.OK:
+ if res and res.status.code == Code.OK:
logger.info(
f"{self} send heartbeat to {endpoints} success."
)
else:
- if res is not None:
+ if res:
logger.error(
f"{self} send heartbeat to {endpoints} error,
code:{res.status.code}, message:{res.status.message}."
)
@@ -469,6 +476,7 @@ class Client:
if self.__client_callback_executor:
self.__client_callback_executor.shutdown()
self.__client_callback_executor = None
+ logger.info("stop client callback executor.")
""" property """
diff --git a/python/rocketmq/v5/client/connection/rpc_channel.py
b/python/rocketmq/v5/client/connection/rpc_channel.py
index 0842bc5e..6ca598e6 100644
--- a/python/rocketmq/v5/client/connection/rpc_channel.py
+++ b/python/rocketmq/v5/client/connection/rpc_channel.py
@@ -129,21 +129,18 @@ class RpcStreamStreamCall:
res = await self.__stream_stream_call.read()
if res.HasField("settings"):
# read a response for send setting result
- if res is not None and res.status.code == Code.OK:
+ if res and res.status.code == Code.OK:
logger.debug(
f"{ self.__handler} sync setting success.
response status code: {res.status.code}"
)
- if (
- res.settings is not None
- and res.settings.metric is not None
- ):
+ if res.settings and res.settings.metric:
# reset metrics if needed
self.__handler.reset_metric(res.settings.metric)
# sync setting
self.__handler.reset_setting(res.settings)
elif res.HasField("recover_orphaned_transaction_command"):
# sever check for a transaction message
- if self.__handler is not None:
+ if self.__handler:
transaction_id = (
res.recover_orphaned_transaction_command.transaction_id
)
@@ -161,14 +158,14 @@ class RpcStreamStreamCall:
)
async def stream_write(self, req):
- if self.__stream_stream_call is not None:
+ if self.__stream_stream_call:
try:
await self.__stream_stream_call.write(req)
except Exception as e:
raise e
def close(self):
- if self.__stream_stream_call is not None:
+ if self.__stream_stream_call:
self.__stream_stream_call.cancel()
@@ -189,9 +186,9 @@ class RpcChannel:
self.__create_aio_channel()
def close_channel(self, loop):
- if self.__async_channel is not None:
+ if self.__async_channel:
# close stream_stream_call
- if self.__telemetry_stream_stream_call is not None:
+ if self.__telemetry_stream_stream_call:
self.__telemetry_stream_stream_call.close()
self.__telemetry_stream_stream_call = None
logger.info(
@@ -220,7 +217,7 @@ class RpcChannel:
def __create_aio_channel(self):
try:
- if self.__endpoints is None:
+ if not self.__endpoints:
raise IllegalArgumentException(
"create_aio_channel exception, endpoints is None"
)
diff --git a/python/rocketmq/v5/client/connection/rpc_client.py
b/python/rocketmq/v5/client/connection/rpc_client.py
index c5c5f3ec..21237f94 100644
--- a/python/rocketmq/v5/client/connection/rpc_client.py
+++ b/python/rocketmq/v5/client/connection/rpc_client.py
@@ -28,6 +28,7 @@ from rocketmq.grpc_protocol import (AckMessageRequest,
QueryAssignmentRequest, QueryRouteRequest,
RecallMessageRequest,
ReceiveMessageRequest, SendMessageRequest,
+ SyncLiteSubscriptionRequest,
TelemetryCommand)
from rocketmq.v5.client.connection import RpcChannel, RpcEndpoints
from rocketmq.v5.log import logger
@@ -193,12 +194,14 @@ class RpcClient:
return RpcClient.__run_message_service_async(
self.__forward_message_to_dead_letter_queue_async_0(endpoints,
req, metadata=metadata, timeout=timeout))
- """ build stream_stream_call """
+ def sync_lite_subscription_async(self, endpoints: RpcEndpoints, req:
SyncLiteSubscriptionRequest, metadata, timeout=3):
+ return RpcClient.__run_message_service_async(
+ self.__sync_lite_subscription_0(endpoints, req, metadata=metadata,
timeout=timeout))
def telemetry_stream(
self, endpoints: RpcEndpoints, client, metadata, rebuild, timeout=3000
):
- # assert asyncio.get_running_loop() == RpcClient._io_loop
+ # build grpc stream_stream_call
try:
channel = self.retrieve_or_create_channel(endpoints)
stream = channel.async_stub.Telemetry(
@@ -298,6 +301,10 @@ class RpcClient:
return await
self.retrieve_or_create_channel(endpoints).async_stub.ForwardMessageToDeadLetterQueue(req,
metadata=metadata,
timeout=timeout)
+ async def __sync_lite_subscription_0(self, endpoints: RpcEndpoints,
+ req: SyncLiteSubscriptionRequest,
metadata, timeout=3):
+ return await
self.retrieve_or_create_channel(endpoints).async_stub.SyncLiteSubscription(req,
metadata=metadata, timeout=timeout)
+
async def __create_channel_async(self, endpoints: RpcEndpoints):
return self.retrieve_or_create_channel(endpoints)
diff --git a/python/rocketmq/v5/consumer/__init__.py
b/python/rocketmq/v5/consumer/__init__.py
index b6a38b3b..daaf81c6 100644
--- a/python/rocketmq/v5/consumer/__init__.py
+++ b/python/rocketmq/v5/consumer/__init__.py
@@ -13,13 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from .message_listener import ConsumeResult, MessageListener
-from .push_consumer import PushConsumer
-from .simple_consumer import SimpleConsumer
+from .push import (ConsumeResult, LitePushConsumer, MessageListener,
+ PushConsumer)
+from .simple import SimpleConsumer
__all__ = [
"SimpleConsumer",
"PushConsumer",
+ "LitePushConsumer",
"MessageListener",
"ConsumeResult",
]
diff --git a/python/rocketmq/v5/consumer/consumer.py
b/python/rocketmq/v5/consumer/consumer.py
index 8be58ae3..dfcd46e5 100644
--- a/python/rocketmq/v5/consumer/consumer.py
+++ b/python/rocketmq/v5/consumer/consumer.py
@@ -57,7 +57,7 @@ class Consumer(Client):
self._consumer_group = consumer_group
# <String /* topic */, FilterExpression>
self._subscriptions = ConcurrentMap()
- if subscription is not None:
+ if subscription:
self._subscriptions.update(subscription)
def subscribe(self, topic, filter_expression: FilterExpression = None):
@@ -94,14 +94,16 @@ class Consumer(Client):
self._subscriptions.remove(topic)
self._remove_unused_topic_route_data(topic)
- def ack(self, message: Message):
+ """ protect """
+
+ def _ack(self, message: Message):
try:
future = self.__ack(message)
self.__handle_ack_result(future)
except Exception as e:
raise e
- def ack_async(self, message: Message):
+ def _ack_async(self, message: Message):
try:
future = self.__ack(message)
ret_future = Future()
@@ -113,14 +115,14 @@ class Consumer(Client):
except Exception as e:
raise e
- def change_invisible_duration(self, message: Message, invisible_duration):
+ def _change_invisible_duration(self, message: Message, invisible_duration):
try:
future = self.__change_invisible_duration(message,
invisible_duration)
self.__handle_change_invisible_result(future, message)
except Exception as e:
raise e
- def change_invisible_duration_async(self, message: Message,
invisible_duration):
+ def _change_invisible_duration_async(self, message: Message,
invisible_duration):
try:
future = self.__change_invisible_duration(message,
invisible_duration)
ret_future = Future()
@@ -132,8 +134,6 @@ class Consumer(Client):
except Exception as e:
raise e
- """ protect """
-
def _receive(self, queue, req, timeout):
try:
receive_future = self.rpc_client.receive_message_async(
@@ -170,8 +170,9 @@ class Consumer(Client):
req.group.name = self._consumer_group
req.group.resource_namespace = self.client_configuration.namespace
req.message_queue.CopyFrom(queue.message_queue0())
- req.filter_expression.type = filter_expression.filter_type
- req.filter_expression.expression = filter_expression.expression
+ if filter_expression:
+ req.filter_expression.type = filter_expression.filter_type
+ req.filter_expression.expression = filter_expression.expression
req.batch_size = max_message_num
if invisible_duration:
req.invisible_duration.seconds = invisible_duration
@@ -250,6 +251,8 @@ class Consumer(Client):
msg_entry = AckMessageEntry()
msg_entry.message_id = message.message_id
msg_entry.receipt_handle = message.receipt_handle
+ if self.client_type == ClientType.LITE_PUSH_CONSUMER:
+ msg_entry.lite_topic = message.lite_topic
req.entries.append(msg_entry)
return req
diff --git a/python/rocketmq/v5/consumer/__init__.py
b/python/rocketmq/v5/consumer/push/__init__.py
similarity index 92%
copy from python/rocketmq/v5/consumer/__init__.py
copy to python/rocketmq/v5/consumer/push/__init__.py
index b6a38b3b..3afabd9b 100644
--- a/python/rocketmq/v5/consumer/__init__.py
+++ b/python/rocketmq/v5/consumer/push/__init__.py
@@ -13,13 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from .lite_push_consumer import LitePushConsumer
from .message_listener import ConsumeResult, MessageListener
from .push_consumer import PushConsumer
-from .simple_consumer import SimpleConsumer
__all__ = [
- "SimpleConsumer",
"PushConsumer",
+ "LitePushConsumer",
"MessageListener",
"ConsumeResult",
]
diff --git a/python/rocketmq/v5/consumer/consumption.py
b/python/rocketmq/v5/consumer/push/consumption.py
similarity index 97%
rename from python/rocketmq/v5/consumer/consumption.py
rename to python/rocketmq/v5/consumer/push/consumption.py
index b4170241..2437a768 100644
--- a/python/rocketmq/v5/consumer/consumption.py
+++ b/python/rocketmq/v5/consumer/push/consumption.py
@@ -16,10 +16,9 @@
import functools
from concurrent.futures import ThreadPoolExecutor
+from rocketmq.v5.consumer.push.message_listener import ConsumeResult
from rocketmq.v5.log import logger
-from .message_listener import ConsumeResult
-
class Consumption:
diff --git a/python/rocketmq/v5/consumer/fifo_consumption.py
b/python/rocketmq/v5/consumer/push/fifo_consumption.py
similarity index 92%
rename from python/rocketmq/v5/consumer/fifo_consumption.py
rename to python/rocketmq/v5/consumer/push/fifo_consumption.py
index 5d4f0658..035d8947 100644
--- a/python/rocketmq/v5/consumer/fifo_consumption.py
+++ b/python/rocketmq/v5/consumer/push/fifo_consumption.py
@@ -18,9 +18,8 @@ import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
-from rocketmq.v5.consumer.message_listener import ConsumeResult
+from rocketmq.v5.consumer.push.message_listener import ConsumeResult
from rocketmq.v5.log import logger
-from rocketmq.v5.model.retry_policy import RetryPolicy
class FifoConsumption:
@@ -90,12 +89,8 @@ class FifoConsumption:
return consume_result
# reconsume
- if self.__backoff_policy:
- max_attempts = self.__backoff_policy.max_attempts
- attempt_delay =
self.__backoff_policy.get_next_attempt_delay(attempt)
- else:
- max_attempts = RetryPolicy.DEFAULT_MAX_ATTEMPTS
- attempt_delay = RetryPolicy.DEFAULT_RECONSUME_DELAY
+ max_attempts = self.__backoff_policy.max_attempts
+ attempt_delay = self.__backoff_policy.get_next_attempt_delay(attempt)
if attempt >= max_attempts:
return consume_result
time.sleep(attempt_delay)
diff --git a/python/rocketmq/v5/consumer/push/lite_push_consumer.py
b/python/rocketmq/v5/consumer/push/lite_push_consumer.py
new file mode 100644
index 00000000..56d0609a
--- /dev/null
+++ b/python/rocketmq/v5/consumer/push/lite_push_consumer.py
@@ -0,0 +1,160 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from rocketmq.grpc_protocol import (ClientType, Code, LiteSubscriptionAction,
+ SyncLiteSubscriptionRequest)
+from rocketmq.v5.client import ClientConfiguration, ClientScheduler
+from rocketmq.v5.exception import (IllegalStateException,
+ LiteSubscriptionQuotaExceededException)
+from rocketmq.v5.log import logger
+from rocketmq.v5.model.filter_expression import FilterExpression
+from rocketmq.v5.util import AtomicInteger, MessagingResultChecker
+
+from .message_listener import MessageListener
+from .push_consumer import PushConsumer
+
+
+class LitePushConsumer(PushConsumer):
+
+ def __init__(
+ self,
+ client_configuration: ClientConfiguration,
+ consumer_group,
+ bind_topic,
+ message_listener: MessageListener = None,
+ max_cache_message_count=1024,
+ max_cache_message_size=64 * 1024 * 1024, # in bytes, 64MB default
+ consumption_thread_count=20,
+ tls_enable=False,
+ ):
+ if not bind_topic:
+ raise Exception("bindTopic has not been set yet.")
+
+ super().__init__(
+ client_configuration,
+ consumer_group,
+ message_listener,
+ {bind_topic: FilterExpression()},
+ max_cache_message_count,
+ max_cache_message_size,
+ consumption_thread_count,
+ tls_enable,
+ ClientType.LITE_PUSH_CONSUMER
+ )
+ self.__bind_topic = bind_topic
+ self.__lite_topics = set()
+ self.__lite_subscription_quota = AtomicInteger(0)
+ self.__max_lite_topic_size = 64
+
+ """ override """
+
+ def _on_start(self):
+ super()._on_start()
+ try:
+ self._sync_all_lite_subscription_scheduler =
ClientScheduler(f"{self.client_id}_sync_all_lite_subscription_scheduler_thread",
self.__sync_all_lite_subscription, 30, 30, self._rpc_channel_io_loop())
+ self._sync_all_lite_subscription_scheduler.start_scheduler()
+ logger.info("start sync all lite subscription scheduler success.")
+ except Exception as e:
+ logger.error(f"{self} on start error, {e}")
+ raise e
+
+ def reset_setting(self, settings):
+ if not settings or not settings.subscription:
+ return
+ if not settings.subscription.lite_subscription_quota or not
settings.subscription.max_lite_topic_size:
+ return
+ self.__lite_subscription_quota =
AtomicInteger(settings.subscription.lite_subscription_quota)
+ self.__max_lite_topic_size = settings.subscription.max_lite_topic_size
+ super().reset_setting(settings)
+
+ """ public """
+
+ def subscribe_lite(self, lite_topic):
+ if not self.is_running:
+ raise IllegalStateException("unable to add lite subscription
because consumer is not running")
+ if lite_topic in self.__lite_topics:
+ return
+ if not lite_topic:
+ raise IllegalStateException("liteTopic is blank.")
+ if len(lite_topic) > self.__max_lite_topic_size:
+ raise IllegalStateException(f"lite_topic: {lite_topic} length
exceeded max length {self.__max_lite_topic_size}.")
+ if len(self.__lite_topics) + 1 > self.__lite_subscription_quota.get():
+ raise LiteSubscriptionQuotaExceededException(f"Lite subscription
exceed quota: {self.__lite_subscription_quota.get()} ",
Code.LITE_SUBSCRIPTION_QUOTA_EXCEEDED)
+ try:
+ res = self.rpc_client.sync_lite_subscription_async(
+ self.client_configuration.rpc_endpoints,
+ self.__sync_lite_subscription_req({lite_topic},
LiteSubscriptionAction.PARTIAL_ADD),
+ metadata=self._sign(),
+ timeout=self.client_configuration.request_timeout,
+ ).result()
+ MessagingResultChecker.check(res.status)
+ if res.status.code == Code.OK:
+ self.__lite_topics.add(lite_topic)
+ logger.info(f"[{self}] subscribe lite_topic:{lite_topic}
success.")
+ except Exception as e:
+ logger.info(f"[{self}] subscribe lite_topic:{lite_topic} raise
exception, {e}.")
+
+ def unsubscribe_lite(self, lite_topic):
+ if not self.is_running:
+ raise IllegalStateException("unable to remove lite subscription
because consumer is not running")
+ if lite_topic not in self.__lite_topics:
+ return
+ try:
+ res = self.rpc_client.sync_lite_subscription_async(
+ self.client_configuration.rpc_endpoints,
+ self.__sync_lite_subscription_req({lite_topic},
LiteSubscriptionAction.PARTIAL_REMOVE),
+ metadata=self._sign(),
+ timeout=self.client_configuration.request_timeout,
+ ).result()
+ MessagingResultChecker.check(res.status)
+ if res.status.code == Code.OK:
+ self.__lite_topics.remove(lite_topic)
+ logger.info(f"[{self}] unsubscribe lite_topic:{lite_topic}
success.")
+ except Exception as e:
+ logger.info(f"[{self}] unsubscribe lite_topic:{lite_topic} raise
exception, {e}.")
+
+ def subscribe(self, topic, filter_expression: FilterExpression = None):
+ raise NotImplementedError("LitePushConsumer does not support topic
subscription.")
+
+ def unsubscribe(self, topic):
+ raise NotImplementedError("LitePushConsumer does not support topic
unsubscription.")
+
+ """ private """
+
+ def __sync_all_lite_subscription(self):
+ if not self.__lite_topics:
+ return
+ try:
+ res = self.rpc_client.sync_lite_subscription_async(
+ self.client_configuration.rpc_endpoints,
+ self.__sync_lite_subscription_req(self.__lite_topics.copy(),
LiteSubscriptionAction.COMPLETE_ADD),
+ metadata=self._sign(),
+ timeout=self.client_configuration.request_timeout,
+ ).result()
+ MessagingResultChecker.check(res.status)
+ if res.status.code == Code.OK:
+ logger.info(f"{self} sync all lite subscription to
{self.client_configuration.rpc_endpoints} success.")
+ except Exception as e:
+ logger.info(f"[{self}] sync all lite subscription to
{self.client_configuration.rpc_endpoints} raise exception, {e}")
+
+ def __sync_lite_subscription_req(self, lite_topics, action:
LiteSubscriptionAction):
+ req = SyncLiteSubscriptionRequest()
+ req.action = action
+ req.topic.name = self.__bind_topic
+ req.topic.resource_namespace = self.client_configuration.namespace
+ req.group.name = self._consumer_group
+ req.group.resource_namespace = self.client_configuration.namespace
+ req.lite_topic_set.extend(lite_topics)
+ return req
diff --git a/python/rocketmq/v5/consumer/message_listener.py
b/python/rocketmq/v5/consumer/push/message_listener.py
similarity index 100%
rename from python/rocketmq/v5/consumer/message_listener.py
rename to python/rocketmq/v5/consumer/push/message_listener.py
diff --git a/python/rocketmq/v5/consumer/push_consumer.py
b/python/rocketmq/v5/consumer/push/push_consumer.py
similarity index 85%
rename from python/rocketmq/v5/consumer/push_consumer.py
rename to python/rocketmq/v5/consumer/push/push_consumer.py
index 03c62736..e0170988 100644
--- a/python/rocketmq/v5/consumer/push_consumer.py
+++ b/python/rocketmq/v5/consumer/push/push_consumer.py
@@ -24,11 +24,11 @@ from rocketmq.grpc_protocol import (ClientType, Code,
QueryAssignmentRequest)
from rocketmq.v5.client import ClientConfiguration, ClientScheduler
from rocketmq.v5.consumer.consumer import Consumer
-from rocketmq.v5.consumer.consumption import Consumption
-from rocketmq.v5.consumer.fifo_consumption import FifoConsumption
-from rocketmq.v5.consumer.message_listener import (ConsumeResult,
- MessageListener)
-from rocketmq.v5.exception import BadRequestException, IllegalStateException
+from rocketmq.v5.consumer.push.consumption import Consumption
+from rocketmq.v5.consumer.push.fifo_consumption import FifoConsumption
+from rocketmq.v5.consumer.push.message_listener import (ConsumeResult,
+ MessageListener)
+from rocketmq.v5.exception import BadRequestException
from rocketmq.v5.log import logger
from rocketmq.v5.model.assignment import Assignments
from rocketmq.v5.model.process_queue import ProcessQueue
@@ -39,6 +39,7 @@ from rocketmq.v5.util import ConcurrentMap,
MessagingResultChecker
class PushConsumer(Consumer):
ACK_MESSAGE_FAILURE_BACKOFF_DELAY = 1
+ DEFAULT_CONSUME_ATTEMPTS = 17
RECEIVE_RETRY_DELAY = 1
def __init__(
@@ -50,12 +51,13 @@ class PushConsumer(Consumer):
max_cache_message_count=1024,
max_cache_message_size=64 * 1024 * 1024, # in bytes, 64MB default
consumption_thread_count=20,
- tls_enable=False
+ tls_enable=False,
+ client_type=ClientType.PUSH_CONSUMER
):
super().__init__(
client_configuration,
consumer_group,
- ClientType.PUSH_CONSUMER,
+ client_type,
subscription,
tls_enable
)
@@ -73,11 +75,15 @@ class PushConsumer(Consumer):
self.__receive_batch_size = 32
self.__long_polling_timeout = 30 # seconds
- def set_listener(self, message_listener: MessageListener):
- self.__message_listener = message_listener
-
""" override """
+ def unsubscribe(self, topic):
+ super().unsubscribe(topic)
+ for message_queue, process_queue in self.__process_queues.items():
+ if message_queue.topic == topic:
+ self.__drop_message_queue(message_queue)
+ self.__assignments.remove(topic)
+
def shutdown(self):
logger.info(f"begin to to shutdown {self}.")
super().shutdown()
@@ -85,28 +91,34 @@ class PushConsumer(Consumer):
logger.info(f"shutdown {self} success.")
def reset_setting(self, settings):
- self.__long_polling_timeout =
settings.subscription.long_polling_timeout.seconds
- self.__configure_consumer_consumption(settings)
+ if settings:
+ self.__long_polling_timeout =
settings.subscription.long_polling_timeout.seconds
+ self.__configure_consumer_consumption(settings)
+ if not self._init_settings_event.is_set():
+ self._init_settings_event.set()
def reset_metric(self, metric):
super().reset_metric(metric)
self.__register_process_queues_gauges()
- def _sync_setting_req(self, endpoints):
- req = super()._sync_setting_req(endpoints)
- req.settings.subscription.long_polling_timeout.seconds =
self.__long_polling_timeout
- return req
+ # def _sync_setting_req(self, endpoints):
+ # req = super()._sync_setting_req(endpoints)
+ # req.settings.subscription.long_polling_timeout.seconds =
self.__long_polling_timeout
+ # return req
def _pre_start(self):
if not self.__message_listener:
- logger.error("message listener has not been set yet")
- raise IllegalStateException(f"{self}'s message listener has not
been set yet.")
+ raise Exception("messageListener has not been set yet.")
+
+ if not self._subscriptions.keys():
+ raise Exception("subscription expressions have not been set yet.")
def _on_start(self):
try:
self.__start_async_executor()
self.__scan_assignment_scheduler =
ClientScheduler(f"{self.client_id}_scan_assignment_schedule_thread",
self.__scan_assignment, 1, 5, self._rpc_channel_io_loop())
self.__scan_assignment_scheduler.start_scheduler()
+ logger.info("start scan assignment scheduler success.")
logger.info(f"{self} start success.")
except Exception as e:
logger.error(f"{self} on start error, {e}")
@@ -128,10 +140,11 @@ class PushConsumer(Consumer):
raise e
def __configure_consumer_consumption(self, settings):
- if settings.backoff_policy.WhichOneof("strategy") ==
"customized_backoff":
- backoff_policy =
CustomizedBackoffRetryPolicy(settings.backoff_policy)
+ use_customized_backoff =
settings.backoff_policy.WhichOneof("strategy") == "customized_backoff"
+ if use_customized_backoff:
+ backoff_policy =
CustomizedBackoffRetryPolicy(settings.backoff_policy,
PushConsumer.DEFAULT_CONSUME_ATTEMPTS)
else:
- backoff_policy = CustomizedBackoffRetryPolicy(None)
+ backoff_policy = CustomizedBackoffRetryPolicy(None,
PushConsumer.DEFAULT_CONSUME_ATTEMPTS)
if not self.__consumption:
self.__fifo = settings.subscription.fifo
@@ -143,8 +156,10 @@ class PushConsumer(Consumer):
self.__consumption = FifoConsumption(self.__message_listener,
self.__consumption_thread_count,
self.__on_fifo_consumption_result,
self.client_id,
backoff_policy)
+ logger.info(f"configure {self} consumption success, fifo:
{self.__fifo}")
else:
- self.__consumption.backoff_policy = backoff_policy
+ if use_customized_backoff and self.__consumption.backoff_policy !=
backoff_policy:
+ self.__consumption.backoff_policy = backoff_policy
# assignment #
@@ -183,12 +198,14 @@ class PushConsumer(Consumer):
else:
existed = self.__assignments.get(topic)
if existed == new_assignments:
+ self.__drop_expired_message_queue()
return
else:
# topic route changed
removed_message_queues = Assignments.diff_queues(existed,
new_assignments)
self.__drop_message_queues(removed_message_queues)
new_message_queues =
Assignments.diff_queues(new_assignments, existed)
+ self.__assignments.put(topic, new_assignments)
self.__drop_expired_message_queue()
self.__process_message_queues(new_message_queues)
except Exception as e:
@@ -210,7 +227,7 @@ class PushConsumer(Consumer):
logger.error(f"queue: {message_queue} end receive, because
consumer is not running.")
return
if process_queue.dropped:
- logger.error(f"queue: {message_queue} end receive, because queue
is dropped. ")
+ logger.info(f"queue: {message_queue} end receive, because queue is
dropped. ")
return
if not attempt_id:
attempt_id = self.__generate_attempt_id()
@@ -276,14 +293,14 @@ class PushConsumer(Consumer):
# in consume thread
try:
if consume_result == ConsumeResult.SUCCESS:
- self.ack(message)
+ self._ack(message)
else:
delivery_attempt = message.delivery_attempt
if self.__consumption.backoff_policy:
invisible_duration =
self.__consumption.backoff_policy.get_next_attempt_delay(delivery_attempt)
else:
invisible_duration = 30
- self.change_invisible_duration(message, invisible_duration)
+ self._change_invisible_duration(message, invisible_duration)
self.__evict_message(message, message_queue)
except Exception as e:
logger.error(f"ack or nack raise exception, {e}")
@@ -322,6 +339,8 @@ class PushConsumer(Consumer):
req.message_id = message.message_id
req.delivery_attempt = message.delivery_attempt
req.max_delivery_attempts =
self.__consumption.backoff_policy.max_attempts
+ if self.client_type == ClientType.LITE_PUSH_CONSUMER:
+ req.lite_topic = message.lite_topic
return req
def __forward_message_to_dead_letter_queue(self, message):
@@ -349,11 +368,14 @@ class PushConsumer(Consumer):
for message_queue, process_queue in self.__process_queues.items():
self.__execute_receive(message_queue, process_queue)
+ def __drop_message_queue(self, dropped_message_queue):
+ dropped_process_queue =
self.__process_queues.remove(dropped_message_queue)
+ if dropped_process_queue:
+ dropped_process_queue.drop()
+
def __drop_message_queues(self, dropped_message_queues):
for dropped_message_queue in dropped_message_queues:
- dropped_process_queue =
self.__process_queues.remove(dropped_message_queue)
- if dropped_process_queue:
- dropped_process_queue.drop()
+ self.__drop_message_queue(dropped_message_queue)
def __drop_expired_message_queue(self):
expired_message_queue = [
@@ -378,7 +400,7 @@ class PushConsumer(Consumer):
if process_queue:
process_queue.evict_message(message)
- def __aggregate_process_queues_by_topic(self, attr_name: str):
+ def __aggregate_process_queues_by_attr_name(self, attr_name: str):
topic_values = {}
for message_queue, process_queue in self.__process_queues.items():
topic = message_queue.topic
@@ -397,10 +419,10 @@ class PushConsumer(Consumer):
]
def __process_queues_cached_count(self):
- return
self.__aggregate_process_queues_by_topic("cached_messages_count")
+ return
self.__aggregate_process_queues_by_attr_name("cached_messages_count")
def __process_queues_cached_bytes(self):
- return
self.__aggregate_process_queues_by_topic("cached_messages_bytes")
+ return
self.__aggregate_process_queues_by_attr_name("cached_messages_bytes")
def __register_process_queues_gauges(self):
self.client_metrics.create_push_consumer_process_queue_observable_gauge(
diff --git a/python/rocketmq/v5/consumer/__init__.py
b/python/rocketmq/v5/consumer/simple/__init__.py
similarity index 83%
copy from python/rocketmq/v5/consumer/__init__.py
copy to python/rocketmq/v5/consumer/simple/__init__.py
index b6a38b3b..bd12e42f 100644
--- a/python/rocketmq/v5/consumer/__init__.py
+++ b/python/rocketmq/v5/consumer/simple/__init__.py
@@ -13,13 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from .message_listener import ConsumeResult, MessageListener
-from .push_consumer import PushConsumer
from .simple_consumer import SimpleConsumer
__all__ = [
"SimpleConsumer",
- "PushConsumer",
- "MessageListener",
- "ConsumeResult",
]
diff --git a/python/rocketmq/v5/consumer/simple_consumer.py
b/python/rocketmq/v5/consumer/simple/simple_consumer.py
similarity index 90%
rename from python/rocketmq/v5/consumer/simple_consumer.py
rename to python/rocketmq/v5/consumer/simple/simple_consumer.py
index 332ae9d3..a29f5532 100644
--- a/python/rocketmq/v5/consumer/simple_consumer.py
+++ b/python/rocketmq/v5/consumer/simple/simple_consumer.py
@@ -72,6 +72,10 @@ class SimpleConsumer(Consumer):
super().shutdown()
logger.info(f"shutdown {self} success.")
+ def reset_setting(self, settings):
+ if not self._init_settings_event.is_set():
+ self._init_settings_event.set()
+
def _on_start(self):
logger.info(f"{self} start success.")
@@ -89,6 +93,20 @@ class SimpleConsumer(Consumer):
return
queue_selector.update(topic_route)
+ """ public """
+
+ def ack(self, message):
+ self._ack(message)
+
+ def ack_async(self, message):
+ self._ack_async(message)
+
+ def change_invisible_duration(self, message, invisible_duration):
+ self._change_invisible_duration(message, invisible_duration)
+
+ def change_invisible_duration_async(self, message, invisible_duration):
+ self._change_invisible_duration_async(message, invisible_duration)
+
""" private """
def __select_topic_for_receive(self):
diff --git a/python/rocketmq/v5/exception/__init__.py
b/python/rocketmq/v5/exception/__init__.py
index 2f58a9fd..71a8ef8b 100644
--- a/python/rocketmq/v5/exception/__init__.py
+++ b/python/rocketmq/v5/exception/__init__.py
@@ -16,7 +16,10 @@
from .client_exception import (BadRequestException, ClientException,
ForbiddenException, IllegalArgumentException,
IllegalStateException, InternalErrorException,
- NotFoundException, PayloadTooLargeException,
+ LiteSubscriptionQuotaExceededException,
+ LiteTopicQuotaExceededException,
+ NotFoundException, PayloadEmptyException,
+ PayloadTooLargeException,
PaymentRequiredException, ProxyTimeoutException,
RequestHeaderFieldsTooLargeException,
TooManyRequestsException, UnauthorizedException,
@@ -30,7 +33,10 @@ __all__ = [
"ForbiddenException",
"NotFoundException",
"PayloadTooLargeException",
+ "PayloadEmptyException",
"TooManyRequestsException",
+ "LiteSubscriptionQuotaExceededException",
+ "LiteTopicQuotaExceededException",
"RequestHeaderFieldsTooLargeException",
"InternalErrorException",
"ProxyTimeoutException",
diff --git a/python/rocketmq/v5/exception/client_exception.py
b/python/rocketmq/v5/exception/client_exception.py
index cd74134a..7c9e2bf0 100644
--- a/python/rocketmq/v5/exception/client_exception.py
+++ b/python/rocketmq/v5/exception/client_exception.py
@@ -21,7 +21,7 @@ class ClientException(Exception):
self.__code = code
def __str__(self):
- if self.__code is not None:
+ if self.__code:
return f"{self.__code}, {super().__str__()}"
else:
return f"{super().__str__()}"
@@ -32,69 +32,59 @@ class ClientException(Exception):
class BadRequestException(ClientException):
-
- def __init__(self, message, code):
- super().__init__(message, code)
+ pass
class UnauthorizedException(ClientException):
-
- def __init__(self, message, code):
- super().__init__(message, code)
+ pass
class PaymentRequiredException(ClientException):
-
- def __init__(self, message, code):
- super().__init__(message, code)
+ pass
class ForbiddenException(ClientException):
-
- def __init__(self, message, code):
- super().__init__(message, code)
+ pass
class NotFoundException(ClientException):
-
- def __init__(self, message, code):
- super().__init__(message, code)
+ pass
class PayloadTooLargeException(ClientException):
+ pass
+
- def __init__(self, message, code):
- super().__init__(message, code)
+class PayloadEmptyException(ClientException):
+ pass
class TooManyRequestsException(ClientException):
+ pass
- def __init__(self, message, code):
- super().__init__(message, code)
+class LiteTopicQuotaExceededException(ClientException):
+ pass
-class RequestHeaderFieldsTooLargeException(ClientException):
- def __init__(self, message, code):
- super().__init__(message, code)
+class LiteSubscriptionQuotaExceededException(ClientException):
+ pass
-class InternalErrorException(ClientException):
+class RequestHeaderFieldsTooLargeException(ClientException):
+ pass
- def __init__(self, message, code):
- super().__init__(message, code)
+class InternalErrorException(ClientException):
+ pass
-class ProxyTimeoutException(ClientException):
- def __init__(self, message, code):
- super().__init__(message, code)
+class ProxyTimeoutException(ClientException):
+ pass
class UnsupportedException(ClientException):
-
- def __init__(self, message, code=None):
- super().__init__(message, code)
+ pass
class IllegalArgumentException(ClientException):
diff --git a/python/rocketmq/v5/model/message.py
b/python/rocketmq/v5/model/message.py
index a94925f7..ce63334d 100644
--- a/python/rocketmq/v5/model/message.py
+++ b/python/rocketmq/v5/model/message.py
@@ -24,6 +24,7 @@ class Message:
def __init__(self):
self.__body = None
self.__topic = None
+ self.__lite_topic = None
self.__namespace = None
self.__message_id = None
self.__tag = None
@@ -51,6 +52,8 @@ class Message:
try:
self.__message_body_check_sum(message)
self.__topic = message.topic.name
+ if message.system_properties.lite_topic:
+ self.__lite_topic = message.system_properties.lite_topic
self.__namespace = message.topic.resource_namespace
self.__message_id = message.system_properties.message_id
self.__body = self.__uncompress_body(message)
@@ -89,14 +92,14 @@ class Message:
if message.system_properties.body_digest.checksum != md5_sum:
self.__corrupted = True
logger.error(
- f"(body_check_sum exception, topic:
{message.topic.name}, message_id: {message.system_properties.message_id}
{message.digest.checksum} != crc32_sum {md5_sum}"
+ f"(body_check_sum exception, topic:
{message.topic.name}, message_id: {message.system_properties.message_id}
{message.digest.checksum} != md5_checksum {md5_sum}"
)
elif message.system_properties.body_digest.type == DigestType.SHA1:
sha1_sum = Misc.sha1_checksum(message.body)
if message.system_properties.body_digest.checksum != sha1_sum:
self.__corrupted = True
logger.error(
- f"(body_check_sum exception, topic:
{message.topic.name}, message_id: {message.system_properties.message_id}
{message.digest.checksum} != crc32_sum {sha1_sum}"
+ f"(body_check_sum exception, topic:
{message.topic.name}, message_id: {message.system_properties.message_id}
{message.digest.checksum} != sha1_checksum {sha1_sum}"
)
else:
logger.error(
@@ -130,6 +133,10 @@ class Message:
def topic(self):
return self.__topic
+ @property
+ def lite_topic(self):
+ return self.__lite_topic
+
@property
def namespace(self):
return self.__namespace
@@ -195,50 +202,58 @@ class Message:
return self.__corrupted
@body.setter
- def body(self, body):
- if body is None or body.strip() == "":
- raise IllegalArgumentException("body should not be blank")
+ def body(self, body: bytes):
+ if not body:
+ raise IllegalArgumentException("body should not be blank.")
self.__body = body
@topic.setter
- def topic(self, topic):
- if topic is None or topic.strip() == "":
- raise IllegalArgumentException("topic has not been set yet")
+ def topic(self, topic: str):
+ if not topic or not topic.strip():
+ raise IllegalArgumentException("topic has not been set yet.")
if Misc.is_valid_topic(topic):
self.__topic = topic
else:
- raise IllegalArgumentException(
- f"topic does not match the regex [regex={Misc.TOPIC_PATTERN}]"
- )
+ raise IllegalArgumentException(f"topic does not match the regex
[regex={Misc.TOPIC_PATTERN}].")
+
+ @lite_topic.setter
+ def lite_topic(self, lite_topic: str):
+ if not lite_topic or not lite_topic.strip():
+ raise IllegalArgumentException("lite_topic has not been set yet.")
+ if self.__message_group:
+ raise IllegalArgumentException("lite_topic and message_group
should not be set at same time.")
+ if self.__delivery_timestamp:
+ raise IllegalArgumentException("lite_topic and delivery_timestamp
should not be set at same time.")
+ self.__lite_topic = lite_topic
@message_id.setter
def message_id(self, message_id):
self.__message_id = message_id
@tag.setter
- def tag(self, tag):
- if tag is None or tag.strip() == "":
- raise IllegalArgumentException("tag should not be blank")
+ def tag(self, tag: str):
+ if not tag or not tag.strip():
+ raise IllegalArgumentException("tag should not be blank.")
if "|" in tag:
- raise IllegalArgumentException('tag should not contain "|"')
+ raise IllegalArgumentException('tag should not contain "|".')
self.__tag = tag
@message_group.setter
- def message_group(self, message_group):
- if self.__delivery_timestamp is not None:
- raise IllegalArgumentException(
- "deliveryTimestamp and messageGroup should not be set at same
time"
- )
- if message_group is None or len(message_group) == 0:
- raise IllegalArgumentException("messageGroup should not be blank")
+ def message_group(self, message_group: str):
+ if not message_group or not message_group.strip():
+ raise IllegalArgumentException("message_group should not be
blank.")
+ if self.__delivery_timestamp:
+ raise IllegalArgumentException("delivery_timestamp and
message_group should not be set at same time.")
+ if self.__lite_topic:
+ raise IllegalArgumentException("lite_topic and message_group
should not be set at same time.")
self.__message_group = message_group
@delivery_timestamp.setter
- def delivery_timestamp(self, delivery_timestamp):
- if self.__message_group is not None:
- raise IllegalArgumentException(
- "deliveryTimestamp and messageGroup should not be set at same
time"
- )
+ def delivery_timestamp(self, delivery_timestamp: int):
+ if self.__message_group:
+ raise IllegalArgumentException("delivery_timestamp and
message_group should not be set at same time.")
+ if self.__lite_topic:
+ raise IllegalArgumentException("lite_topic and delivery_timestamp
should not be set at same time.")
self.__delivery_timestamp = delivery_timestamp
@transport_delivery_timestamp.setter
@@ -246,10 +261,10 @@ class Message:
self.__transport_delivery_timestamp = transport_delivery_timestamp
@keys.setter
- def keys(self, *keys):
+ def keys(self, *keys: str):
for key in keys:
- if not key or key.strip() == "":
- raise IllegalArgumentException("key should not be blank")
+ if not key or not key.strip():
+ raise IllegalArgumentException("key should not be blank.")
self.__keys.update(set(keys))
@receipt_handle.setter
@@ -264,11 +279,11 @@ class Message:
def endpoints(self, endpoints):
self.__endpoints = endpoints
- def add_property(self, key, value):
- if key is None or key.strip() == "":
- raise IllegalArgumentException("key should not be blank")
- if value is None or value.strip() == "":
- raise IllegalArgumentException("value should not be blank")
+ def add_property(self, key: str, value: str):
+ if not key or not key.strip():
+ raise IllegalArgumentException("key should not be blank.")
+ if not value or not value.strip():
+ raise IllegalArgumentException("value should not be blank.")
self.__properties[key] = value
@staticmethod
@@ -281,5 +296,7 @@ class Message:
return "DELAY"
elif message_type == 4:
return "TRANSACTION"
+ elif message_type == 5:
+ return "LITE"
else:
return "MESSAGE_TYPE_UNSPECIFIED"
diff --git a/python/rocketmq/v5/model/process_queue.py
b/python/rocketmq/v5/model/process_queue.py
index 4d876485..04a1d2d6 100644
--- a/python/rocketmq/v5/model/process_queue.py
+++ b/python/rocketmq/v5/model/process_queue.py
@@ -72,7 +72,7 @@ class ProcessQueue:
def expired(self, long_polling_timeout, request_timeout):
max_idle_duration = (long_polling_timeout + request_timeout) * 3
idle_duration = int(time.time()) - self.__active_time
- if idle_duration < 0:
+ if idle_duration - max_idle_duration < 0:
return False
after_cache_full_duration = int(time.time()) - self.__cache_full_time
if after_cache_full_duration - max_idle_duration < 0:
diff --git a/python/rocketmq/v5/model/retry_policy.py
b/python/rocketmq/v5/model/retry_policy.py
index 063d6424..3e6d7092 100644
--- a/python/rocketmq/v5/model/retry_policy.py
+++ b/python/rocketmq/v5/model/retry_policy.py
@@ -18,14 +18,14 @@ from rocketmq.v5.exception import IllegalArgumentException
class RetryPolicy:
- DEFAULT_MAX_ATTEMPTS = 17
- DEFAULT_RECONSUME_DELAY = 1
+ DEFAULT_RECONSUME_DELAY = 1 # seconds
+ DEFAULT_RESEND_DELAY = 1 # seconds
- def __init__(self, backoff_policy):
+ def __init__(self, backoff_policy, max_attempts):
if backoff_policy:
self.__max_attempts = backoff_policy.max_attempts
else:
- self.__max_attempts = RetryPolicy.DEFAULT_MAX_ATTEMPTS
+ self.__max_attempts = max_attempts
@property
def max_attempts(self):
@@ -34,13 +34,16 @@ class RetryPolicy:
class CustomizedBackoffRetryPolicy(RetryPolicy):
- def __init__(self, backoff_policy):
- super().__init__(backoff_policy)
+ def __init__(self, backoff_policy, default_max_attempts):
+ super().__init__(backoff_policy, default_max_attempts)
if backoff_policy:
self.__durations = list(map(lambda item: item.seconds,
backoff_policy.customized_backoff.next))
else:
self.__durations = list()
+ def __eq__(self, other):
+ return self.max_attempts == other.max_attempts and self.__durations ==
other.__durations
+
def get_next_attempt_delay(self, attempt):
if attempt < 0:
raise IllegalArgumentException("attempt must be positive")
@@ -49,3 +52,48 @@ class CustomizedBackoffRetryPolicy(RetryPolicy):
return self.__durations[size - 1] if attempt > size else
self.__durations[attempt - 1]
else:
return RetryPolicy.DEFAULT_RECONSUME_DELAY
+
+ @property
+ def durations(self):
+ return self.__durations
+
+
+class ExponentialBackoffRetryPolicy(RetryPolicy):
+
+ def __init__(self, backoff_policy, default_max_attempts):
+ super().__init__(backoff_policy, default_max_attempts)
+ if backoff_policy:
+ self.__initial_backoff =
backoff_policy.exponential_backoff.initial.seconds * 1_000_000_000 +
backoff_policy.exponential_backoff.initial.nanos # nanos
+ self.__max_backoff =
backoff_policy.exponential_backoff.max.seconds * 1_000_000_000 +
backoff_policy.exponential_backoff.max.nanos # nanos
+ self.__multiplier = backoff_policy.exponential_backoff.multiplier
+ else:
+ self.__initial_backoff = None
+ self.__max_backoff = None
+ self.__multiplier = None
+
+ def __eq__(self, other):
+ return self.max_attempts == other.max_attempts and
self.initial_backoff == other.initial_backoff and self.max_backoff ==
other.max_backoff and self.multiplier == other.multiplier
+
+ def get_next_attempt_delay(self, attempt):
+ if attempt < 0:
+ raise IllegalArgumentException("attempt must be positive")
+ if self.__initial_backoff and self.__max_backoff and self.__multiplier:
+ exp_backoff_nanos = self.__initial_backoff * (self.__multiplier **
(attempt - 1))
+ delay_nanos = int(min(exp_backoff_nanos, self.__max_backoff))
+ if delay_nanos <= 0:
+ return 0
+ return delay_nanos / 1_000_000_000
+ else:
+ return RetryPolicy.DEFAULT_RESEND_DELAY
+
+ @property
+ def initial_backoff(self):
+ return self.__initial_backoff
+
+ @property
+ def max_backoff(self):
+ return self.__max_backoff
+
+ @property
+ def multiplier(self):
+ return self.__multiplier
diff --git a/python/rocketmq/v5/producer/producer.py
b/python/rocketmq/v5/producer/producer.py
index 0f16cf3e..ec325005 100644
--- a/python/rocketmq/v5/producer/producer.py
+++ b/python/rocketmq/v5/producer/producer.py
@@ -33,6 +33,7 @@ from rocketmq.v5.exception import (ClientException,
IllegalArgumentException,
TooManyRequestsException)
from rocketmq.v5.log import logger
from rocketmq.v5.model import CallbackResult, Message, SendReceipt
+from rocketmq.v5.model.retry_policy import ExponentialBackoffRetryPolicy
from rocketmq.v5.util import (ConcurrentMap, MessageIdCodec,
MessagingResultChecker, Misc)
@@ -125,7 +126,9 @@ class TransactionChecker(metaclass=abc.ABCMeta):
class Producer(Client):
- MAX_SEND_ATTEMPTS = 3 # max retry times when send failed
+
+ DEFAULT_MAX_SEND_ATTEMPTS = 3 # default max retry times when send failed
+ DEFAULT_MAX_BODY_SIZE = 4 * 1024 * 1024 # default max message body size
def __init__(
self, client_configuration, topics=None, checker=None, tls_enable=False
@@ -137,6 +140,12 @@ class Producer(Client):
checker # checker for transaction message, handle checking from
server
)
self.__transaction_check_executor = ThreadPoolExecutor(max_workers=1,
thread_name_prefix="transaction_check_thread")
+ self.__max_body_size = Producer.DEFAULT_MAX_BODY_SIZE
+ self.__validate_message_type = True
+ self.__backoff_policy = ExponentialBackoffRetryPolicy(
+ None,
+ Producer.DEFAULT_MAX_SEND_ATTEMPTS
+ )
def __str__(self):
return f"{ClientType.Name(self.client_type)}
client_id:{self.client_id}"
@@ -149,7 +158,7 @@ class Producer(Client):
self.__wrap_sending_message(message, False if transaction is None else
True)
topic_queue = self.__select_send_queue(message)
- if message.message_type not in topic_queue.accept_message_types:
+ if self.__validate_message_type and message.message_type not in
topic_queue.accept_message_types:
raise IllegalArgumentException(
f"current message type not match with queue accept message
types, topic:{message.topic},
message_type:{Message.message_type_desc(message.message_type)}, queue access
type:{topic_queue.accept_message_types_desc()}"
)
@@ -264,6 +273,19 @@ class Producer(Client):
""" override """
+ def reset_setting(self, settings):
+ self.__max_body_size = settings.publishing.max_body_size
+ self.__validate_message_type =
settings.publishing.validate_message_type
+ use_exponential = settings and settings.backoff_policy and
settings.backoff_policy.WhichOneof("strategy") == "exponential_backoff"
+ new_policy = ExponentialBackoffRetryPolicy(
+ settings.backoff_policy if use_exponential else None,
+ Producer.DEFAULT_MAX_SEND_ATTEMPTS
+ )
+ if use_exponential and self.__backoff_policy != new_policy:
+ self.__backoff_policy = new_policy
+ if not self._init_settings_event.is_set():
+ self._init_settings_event.set()
+
def _on_start(self):
logger.info(f"{self} start success.")
@@ -355,7 +377,7 @@ class Producer(Client):
retry_exception_future = self.__check_send_retry_condition(
message, topic_queue, attempt, e
)
- if retry_exception_future is not None:
+ if retry_exception_future:
# end retry with exception
self.client_metrics.send_after(send_metric_context, False)
raise retry_exception_future.exception()
@@ -435,7 +457,7 @@ class Producer(Client):
def __check_send_retry_condition(self, message, topic_queue, attempt, e):
end_retry = False
- if attempt > Producer.MAX_SEND_ATTEMPTS:
+ if attempt > self.__backoff_policy.max_attempts:
logger.error(
f"{self} failed to send message to {topic_queue.endpoints},
because of run out of attempt times, topic:{message.topic},
message_id:{message.message_id}, message_type:{message.message_type},
attempt:{attempt}"
)
@@ -465,7 +487,7 @@ class Producer(Client):
msg = req.messages.add()
msg.topic.name = message.topic
msg.topic.resource_namespace = self.client_configuration.namespace
- if message.body is None or len(message.body) == 0:
+ if not message.body or len(message.body) == 0:
raise IllegalArgumentException("message body is none.")
max_body_size = 4 * 1024 * 1024 # max body size is 4m
if len(message.body) > max_body_size:
@@ -474,20 +496,23 @@ class Producer(Client):
)
msg.body = message.body
- if message.tag is not None:
+ if message.lite_topic:
+ msg.system_properties.lite_topic = message.lite_topic
+ if message.tag:
msg.system_properties.tag = message.tag
- if message.keys is not None:
+ if message.keys:
msg.system_properties.keys.extend(message.keys)
- if message.properties is not None:
+ if message.properties:
msg.user_properties.update(message.properties)
msg.system_properties.message_id = message.message_id
msg.system_properties.message_type = message.message_type
msg.system_properties.born_timestamp.seconds = int(time.time())
msg.system_properties.born_host = Misc.get_local_ip()
msg.system_properties.body_encoding = Encoding.IDENTITY
- if message.message_group is not None:
+
+ if message.message_group:
msg.system_properties.message_group = message.message_group
- if message.delivery_timestamp is not None:
+ if message.delivery_timestamp:
msg.system_properties.delivery_timestamp.seconds = (
message.delivery_timestamp
)
@@ -497,22 +522,26 @@ class Producer(Client):
def __send_message_type(self, message: Message, is_transaction=False):
if (
- message.message_group is None
- and message.delivery_timestamp is None
- and is_transaction is False
+ not message.message_group
+ and not message.lite_topic
+ and not message.delivery_timestamp
+ and not is_transaction
):
return MessageType.NORMAL
- if message.message_group is not None and is_transaction is False:
+ if message.message_group and not is_transaction:
return MessageType.FIFO
- if message.delivery_timestamp is not None and is_transaction is False:
+ if message.lite_topic and not is_transaction:
+ return MessageType.LITE
+
+ if message.delivery_timestamp and not is_transaction:
return MessageType.DELAY
if (
- message.message_group is None
- and message.delivery_timestamp is None
- and is_transaction is True
+ not message.message_group
+ and not message.delivery_timestamp
+ and is_transaction
):
return MessageType.TRANSACTION
@@ -595,7 +624,7 @@ class Producer(Client):
def __server_transaction_check_callback(self, future, message,
transaction_id, result):
try:
res = future.result()
- if res is not None and res.status.code == Code.OK:
+ if res and res.status.code == Code.OK:
if result == TransactionResolution.COMMIT:
logger.debug(
f"{self} commit message. message_id:
{message.message_id}, transaction_id: {transaction_id}, res: {res}"
diff --git a/python/rocketmq/v5/util/messaging_result_checker.py
b/python/rocketmq/v5/util/messaging_result_checker.py
index 4091397a..f294be16 100644
--- a/python/rocketmq/v5/util/messaging_result_checker.py
+++ b/python/rocketmq/v5/util/messaging_result_checker.py
@@ -15,7 +15,11 @@
from rocketmq.grpc_protocol import Code, Status
from rocketmq.v5.exception import (BadRequestException, ForbiddenException,
- InternalErrorException, NotFoundException,
+ IllegalStateException,
+ InternalErrorException,
+ LiteSubscriptionQuotaExceededException,
+ LiteTopicQuotaExceededException,
+ NotFoundException, PayloadEmptyException,
PayloadTooLargeException,
PaymentRequiredException,
ProxyTimeoutException,
@@ -29,6 +33,9 @@ class MessagingResultChecker:
@staticmethod
def check(status: Status):
+ if not status or not status.code:
+ raise IllegalStateException("response is illegal.")
+
code = status.code
message = status.message
@@ -42,6 +49,7 @@ class MessagingResultChecker:
or code == Code.ILLEGAL_MESSAGE_TAG
or code == Code.ILLEGAL_MESSAGE_KEY
or code == Code.ILLEGAL_MESSAGE_GROUP
+ or code == Code.ILLEGAL_LITE_TOPIC
or code == Code.ILLEGAL_MESSAGE_PROPERTY_KEY
or code == Code.INVALID_TRANSACTION_ID
or code == Code.ILLEGAL_MESSAGE_ID
@@ -72,8 +80,14 @@ class MessagingResultChecker:
raise NotFoundException(message, code)
elif code == Code.PAYLOAD_TOO_LARGE or code ==
Code.MESSAGE_BODY_TOO_LARGE:
raise PayloadTooLargeException(message, code)
+ elif code == Code.MESSAGE_BODY_EMPTY:
+ raise PayloadEmptyException(message, code)
elif code == Code.TOO_MANY_REQUESTS:
raise TooManyRequestsException(message, code)
+ elif code == Code.LITE_TOPIC_QUOTA_EXCEEDED:
+ raise LiteTopicQuotaExceededException(message, code)
+ elif code == Code.LITE_SUBSCRIPTION_QUOTA_EXCEEDED:
+ raise LiteSubscriptionQuotaExceededException(message, code)
elif (
code == Code.REQUEST_HEADER_FIELDS_TOO_LARGE
or code == Code.MESSAGE_PROPERTIES_TOO_LARGE
diff --git a/python/rocketmq/v5/util/misc.py b/python/rocketmq/v5/util/misc.py
index be3e5d3d..a377ab4f 100644
--- a/python/rocketmq/v5/util/misc.py
+++ b/python/rocketmq/v5/util/misc.py
@@ -31,7 +31,7 @@ class Misc:
__OS_NAME = None
TOPIC_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
CONSUMER_GROUP_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
- SDK_VERSION = "5.0.9"
+ SDK_VERSION = "5.1.0"
@staticmethod
def sdk_language():