This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b838e73 Release Python SDK 5.1.0 (#1167)
0b838e73 is described below

commit 0b838e7382e066a37df7a4b68e701fe182ec435e
Author: zhouli11 <[email protected]>
AuthorDate: Mon Jan 5 17:36:36 2026 +0800

    Release Python SDK 5.1.0 (#1167)
---
 README-CN.md                                       |   8 +-
 README.md                                          |   8 +-
 ...er_example.py => lite_push_consumer_example.py} |  34 +++--
 python/example/push_consumer_example.py            |  16 +--
 python/rocketmq/__init__.py                        |   5 +-
 python/rocketmq/grpc_protocol/__init__.py          |   7 +-
 python/rocketmq/grpc_protocol/definition_pb2.py    |  94 ++++++------
 .../rocketmq/grpc_protocol/proto/definition.proto  |  29 ++++
 python/rocketmq/grpc_protocol/proto/service.proto  |  28 ++++
 python/rocketmq/grpc_protocol/service_pb2.py       | 135 ++++++++---------
 python/rocketmq/grpc_protocol/service_pb2_grpc.py  |  34 +++++
 python/rocketmq/v5/client/client.py                |  28 ++--
 .../rocketmq/v5/client/connection/rpc_channel.py   |  19 ++-
 python/rocketmq/v5/client/connection/rpc_client.py |  11 +-
 python/rocketmq/v5/consumer/__init__.py            |   7 +-
 python/rocketmq/v5/consumer/consumer.py            |  21 +--
 python/rocketmq/v5/consumer/{ => push}/__init__.py |   4 +-
 .../rocketmq/v5/consumer/{ => push}/consumption.py |   3 +-
 .../v5/consumer/{ => push}/fifo_consumption.py     |  11 +-
 .../v5/consumer/push/lite_push_consumer.py         | 160 +++++++++++++++++++++
 .../v5/consumer/{ => push}/message_listener.py     |   0
 .../v5/consumer/{ => push}/push_consumer.py        |  84 +++++++----
 .../rocketmq/v5/consumer/{ => simple}/__init__.py  |   5 -
 .../v5/consumer/{ => simple}/simple_consumer.py    |  18 +++
 python/rocketmq/v5/exception/__init__.py           |   8 +-
 python/rocketmq/v5/exception/client_exception.py   |  54 +++----
 python/rocketmq/v5/model/message.py                |  87 ++++++-----
 python/rocketmq/v5/model/process_queue.py          |   2 +-
 python/rocketmq/v5/model/retry_policy.py           |  60 +++++++-
 python/rocketmq/v5/producer/producer.py            |  67 ++++++---
 .../rocketmq/v5/util/messaging_result_checker.py   |  16 ++-
 python/rocketmq/v5/util/misc.py                    |   2 +-
 32 files changed, 736 insertions(+), 329 deletions(-)

diff --git a/README-CN.md b/README-CN.md
index 88fb3023..781a3b62 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -23,11 +23,11 @@
 | Producer with FIFO messages                    |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 | Producer with timed/delay messages             |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 | Producer with transactional messages           |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
-| Producer with recalling timed/delay messages   |   ✅   |   ✅   |   🚧   |   🚧 
   |   🚧   |   🚧    |    🚧    |   🚧   |
+| Producer with recalling timed/delay messages   |   ✅   |   ✅   |   🚧   |   🚧 
   |   🚧   |   ✅    |    🚧    |   🚧   |
 | Simple consumer                                |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
-| Push consumer with concurrent message listener |   ✅   |   ✅   |   ✅   |   🚧 
   |   ✅   |   🚧    |    🚧    |   🚧   |
-| Push consumer with FIFO message listener       |   ✅   |   ✅   |   ✅   |   🚧 
   |   ✅   |   🚧    |    🚧    |   🚧   |
-| Push consumer with FIFO consume accelerator    |   ✅   |   ✅   |   🚧   |   🚧 
   |   🚧   |   🚧    |    🚧    |   🚧   |
+| Push consumer with concurrent message listener |   ✅   |   ✅   |   ✅   |   🚧 
   |   ✅   |   ✅    |    🚧    |   🚧   |
+| Push consumer with FIFO message listener       |   ✅   |   ✅   |   ✅   |   🚧 
   |   ✅   |   ✅    |    🚧    |   🚧   |
+| Push consumer with FIFO consume accelerator    |   ✅   |   ✅   |   🚧   |   🚧 
   |   🚧   |   ✅    |    🚧    |   🚧   |
 | Priority Message                               |   ✅   |   🚧   |   🚧   |   🚧 
   |   🚧   |   🚧    |    🚧    |   🚧   |
 
 ## 先决条件和构建
diff --git a/README.md b/README.md
index 8f985f2d..ed166f46 100644
--- a/README.md
+++ b/README.md
@@ -23,11 +23,11 @@ Provide cloud-native and robust solutions for Java, C++, 
C#, Golang, Rust and al
 | Producer with FIFO messages                    |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 | Producer with timed/delay messages             |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 | Producer with transactional messages           |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
-| Producer with recalling timed/delay messages   |   ✅   |   ✅   |   🚧   |   🚧 
   |   🚧   |   🚧    |    🚧    |   🚧   |
+| Producer with recalling timed/delay messages   |   ✅   |   ✅   |   🚧   |   🚧 
   |   🚧   |   ✅    |    🚧    |   🚧   |
 | Simple consumer                                |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
-| Push consumer with concurrent message listener |   ✅   |   ✅   |   ✅   |   🚧 
   |   ✅   |   🚧    |    🚧    |   🚧   |
-| Push consumer with FIFO message listener       |   ✅   |   ✅   |   ✅   |   🚧 
   |   ✅   |   🚧    |    🚧    |   🚧   |
-| Push consumer with FIFO consume accelerator    |   ✅   |   ✅   |   🚧   |   🚧 
   |   🚧   |   🚧    |    🚧    |   🚧   |
+| Push consumer with concurrent message listener |   ✅   |   ✅   |   ✅   |   🚧 
   |   ✅   |   ✅    |    🚧    |   🚧   |
+| Push consumer with FIFO message listener       |   ✅   |   ✅   |   ✅   |   🚧 
   |   ✅   |   ✅    |    🚧    |   🚧   |
+| Push consumer with FIFO consume accelerator    |   ✅   |   ✅   |   🚧   |   🚧 
   |   🚧   |   ✅    |    🚧    |   🚧   |
 | Priority Message                               |   ✅   |   🚧   |   🚧   |   🚧 
   |   🚧   |   🚧    |    🚧    |   🚧   |
 
 ## Prerequisite and Build
diff --git a/python/example/push_consumer_example.py 
b/python/example/lite_push_consumer_example.py
similarity index 57%
copy from python/example/push_consumer_example.py
copy to python/example/lite_push_consumer_example.py
index c35b8f23..f398e474 100644
--- a/python/example/push_consumer_example.py
+++ b/python/example/lite_push_consumer_example.py
@@ -13,17 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from rocketmq import (ClientConfiguration, Credentials, FilterExpression,
-                      Message)
-from rocketmq.v5.consumer import PushConsumer
-from rocketmq.v5.consumer.message_listener import (ConsumeResult,
-                                                   MessageListener)
+from rocketmq import (ClientConfiguration, ConsumeResult, Credentials,
+                      LitePushConsumer, Message, MessageListener)
 
 
-class TestMessageListener(MessageListener):
+class LiteTopicTestMessageListener(MessageListener):
 
     def consume(self, message: Message) -> ConsumeResult:
-        print(f"consume message, topic:{message.topic}, message_id: 
{message.message_id}.")
+        print(f"consume message, topic:{message.topic}, 
lite_topic:{message.lite_topic}, message_id: {message.message_id}.")
         return ConsumeResult.SUCCESS
 
 
@@ -36,21 +33,22 @@ if __name__ == '__main__':
     config = ClientConfiguration(endpoints, credentials)
     # with namespace
     # config = ClientConfiguration(endpoints, credentials, "namespace")
-    topic = "topic"
+    bind_topic = "topic"
     consumer_group = "consumer-group"
-    # in most case, you don't need to create too many consumers, singleton 
pattern is recommended
-    # close the simple consumer when you don't need it anymore
-    push_consumer = PushConsumer(config, consumer_group, 
TestMessageListener(), {topic: FilterExpression()})
+    # A LitePushConsumer can only be bound to one topic
+    lite_push_consumer = LitePushConsumer(config, consumer_group, bind_topic, 
LiteTopicTestMessageListener())
 
     try:
-        push_consumer.startup()
+        lite_push_consumer.startup()
+        for i in range(0, 10):
+            lite_push_consumer.subscribe_lite("lite-test-" + str(i))
         try:
             input("Please Enter to Stop the Application.\r\n")
         except Exception as e:
-            print(f"{push_consumer} raise exception: {e}")
-            push_consumer.shutdown()
-            print(f"{push_consumer} shutdown.")
+            print(f"{lite_push_consumer} raise exception: {e}")
+            lite_push_consumer.shutdown()
+            print(f"{lite_push_consumer} shutdown.")
     except Exception as e:
-        print(f"{push_consumer} startup raise exception: {e}")
-        push_consumer.shutdown()
-        print(f"{push_consumer} shutdown.")
+        print(f"{lite_push_consumer} startup raise exception: {e}")
+        lite_push_consumer.shutdown()
+        print(f"{lite_push_consumer} shutdown.")
diff --git a/python/example/push_consumer_example.py 
b/python/example/push_consumer_example.py
index c35b8f23..0baa5099 100644
--- a/python/example/push_consumer_example.py
+++ b/python/example/push_consumer_example.py
@@ -13,11 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from rocketmq import (ClientConfiguration, Credentials, FilterExpression,
-                      Message)
-from rocketmq.v5.consumer import PushConsumer
-from rocketmq.v5.consumer.message_listener import (ConsumeResult,
-                                                   MessageListener)
+from rocketmq import (ClientConfiguration, ConsumeResult, Credentials,
+                      FilterExpression, Message, MessageListener, PushConsumer)
 
 
 class TestMessageListener(MessageListener):
@@ -37,10 +34,10 @@ if __name__ == '__main__':
     # with namespace
     # config = ClientConfiguration(endpoints, credentials, "namespace")
     topic = "topic"
-    consumer_group = "consumer-group"
+    consumer_group = "consumer_group"
     # in most case, you don't need to create too many consumers, singleton 
pattern is recommended
-    # close the simple consumer when you don't need it anymore
-    push_consumer = PushConsumer(config, consumer_group, 
TestMessageListener(), {topic: FilterExpression()})
+    # close the push consumer when you don't need it anymore
+    push_consumer = PushConsumer(config, consumer_group, 
TestMessageListener(), {topic: FilterExpression(), })
 
     try:
         push_consumer.startup()
@@ -52,5 +49,6 @@ if __name__ == '__main__':
             print(f"{push_consumer} shutdown.")
     except Exception as e:
         print(f"{push_consumer} startup raise exception: {e}")
-        push_consumer.shutdown()
+        if push_consumer.is_running:
+            push_consumer.shutdown()
         print(f"{push_consumer} shutdown.")
diff --git a/python/rocketmq/__init__.py b/python/rocketmq/__init__.py
index 420795b9..d5509716 100644
--- a/python/rocketmq/__init__.py
+++ b/python/rocketmq/__init__.py
@@ -16,8 +16,8 @@
 from rocketmq.grpc_protocol import TransactionResolution
 
 from .v5.client import ClientConfiguration, Credentials
-from .v5.consumer import (ConsumeResult, MessageListener, PushConsumer,
-                          SimpleConsumer)
+from .v5.consumer import (ConsumeResult, LitePushConsumer, MessageListener,
+                          PushConsumer, SimpleConsumer)
 from .v5.model import FilterExpression, Message, SendReceipt
 from .v5.producer import Producer, Transaction, TransactionChecker
 
@@ -28,6 +28,7 @@ __all__ = [
     "TransactionResolution", # noqa
     "SimpleConsumer",
     "PushConsumer",
+    "LitePushConsumer",
     "MessageListener",
     "ConsumeResult",
     "Message",
diff --git a/python/rocketmq/grpc_protocol/__init__.py 
b/python/rocketmq/grpc_protocol/__init__.py
index 5f3ff16c..cd6a0557 100644
--- a/python/rocketmq/grpc_protocol/__init__.py
+++ b/python/rocketmq/grpc_protocol/__init__.py
@@ -15,7 +15,7 @@
 
 from .definition_pb2 import (Address, AddressScheme, Broker,  # noqa
                              ClientType, Code, DigestType, Encoding, 
Endpoints, # noqa
-                             FilterType, Language, MessageType, Metric, # noqa
+                             FilterType, Language, LiteSubscriptionAction, 
MessageType, Metric, # noqa
                              Permission, Publishing, Resource, Settings, # noqa
                              Status, Subscription, TransactionResolution, # 
noqa
                              TransactionSource) # noqa
@@ -24,7 +24,8 @@ from .service_pb2 import (AckMessageEntry, AckMessageRequest, 
 # noqa
                           EndTransactionRequest, HeartbeatRequest, # noqa
                           NotifyClientTerminationRequest, QueryRouteRequest, # 
noqa
                           ReceiveMessageRequest, SendMessageRequest, # noqa
-                          TelemetryCommand, RecallMessageRequest, 
QueryAssignmentRequest, # noqa
+                          SyncLiteSubscriptionRequest, TelemetryCommand, # noqa
+                          RecallMessageRequest, QueryAssignmentRequest, # noqa
                           ForwardMessageToDeadLetterQueueRequest) # noqa
 from .service_pb2_grpc import MessagingServiceStub
 
@@ -61,6 +62,8 @@ __all__ = [
     "TelemetryCommand", # noqa
     "RecallMessageRequest", # noqa
     "QueryAssignmentRequest", # noqa
+    "SyncLiteSubscriptionRequest", # noqa
+    "LiteSubscriptionAction", # noqa
     "ForwardMessageToDeadLetterQueueRequest", # noqa
     "MessagingServiceStub",
 ]
diff --git a/python/rocketmq/grpc_protocol/definition_pb2.py 
b/python/rocketmq/grpc_protocol/definition_pb2.py
index 9d85c8f1..924979a7 100644
--- a/python/rocketmq/grpc_protocol/definition_pb2.py
+++ b/python/rocketmq/grpc_protocol/definition_pb2.py
@@ -26,7 +26,7 @@ from google.protobuf import timestamp_pb2 as 
google_dot_protobuf_dot_timestamp__
 from google.protobuf import duration_pb2 as 
google_dot_protobuf_dot_duration__pb2
 
 
-DESCRIPTOR = 
_descriptor_pool.Default().AddSerializedFile(b'\n\x10\x64\x65\x66inition.proto\x12\x12\x61pache.rocketmq.v2\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/duration.proto\"T\n\x10\x46ilterExpression\x12,\n\x04type\x18\x01
 \x01(\x0e\x32\x1e.apache.rocketmq.v2.FilterType\x12\x12\n\nexpression\x18\x02 
\x01(\t\"\xbb\x01\n\x0bRetryPolicy\x12\x14\n\x0cmax_attempts\x18\x01 
\x01(\x05\x12\x45\n\x13\x65xponential_backoff\x18\x02 
\x01(\x0b\x32&.apache.rocketmq.v2.Exponent [...]
+DESCRIPTOR = 
_descriptor_pool.Default().AddSerializedFile(b'\n\x10\x64\x65\x66inition.proto\x12\x12\x61pache.rocketmq.v2\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/duration.proto\"T\n\x10\x46ilterExpression\x12,\n\x04type\x18\x01
 \x01(\x0e\x32\x1e.apache.rocketmq.v2.FilterType\x12\x12\n\nexpression\x18\x02 
\x01(\t\"\xbb\x01\n\x0bRetryPolicy\x12\x14\n\x0cmax_attempts\x18\x01 
\x01(\x05\x12\x45\n\x13\x65xponential_backoff\x18\x02 
\x01(\x0b\x32&.apache.rocketmq.v2.Exponent [...]
 
 _globals = globals()
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -36,30 +36,32 @@ if not _descriptor._USE_C_DESCRIPTORS:
   _globals['DESCRIPTOR']._serialized_options = 
b'\n\022apache.rocketmq.v2B\010MQDomainP\001\240\001\001\330\001\001\252\002\022Apache.Rocketmq.V2'
   _globals['_MESSAGE_USERPROPERTIESENTRY']._loaded_options = None
   _globals['_MESSAGE_USERPROPERTIESENTRY']._serialized_options = b'8\001'
-  _globals['_TRANSACTIONRESOLUTION']._serialized_start=3943
-  _globals['_TRANSACTIONRESOLUTION']._serialized_end=4032
-  _globals['_TRANSACTIONSOURCE']._serialized_start=4034
-  _globals['_TRANSACTIONSOURCE']._serialized_end=4121
-  _globals['_PERMISSION']._serialized_start=4123
-  _globals['_PERMISSION']._serialized_end=4210
-  _globals['_FILTERTYPE']._serialized_start=4212
-  _globals['_FILTERTYPE']._serialized_end=4271
-  _globals['_ADDRESSSCHEME']._serialized_start=4273
-  _globals['_ADDRESSSCHEME']._serialized_end=4357
-  _globals['_MESSAGETYPE']._serialized_start=4359
-  _globals['_MESSAGETYPE']._serialized_end=4452
-  _globals['_DIGESTTYPE']._serialized_start=4454
-  _globals['_DIGESTTYPE']._serialized_end=4525
-  _globals['_CLIENTTYPE']._serialized_start=4527
-  _globals['_CLIENTTYPE']._serialized_end=4641
-  _globals['_ENCODING']._serialized_start=4643
-  _globals['_ENCODING']._serialized_end=4703
-  _globals['_CODE']._serialized_start=4706
-  _globals['_CODE']._serialized_end=6056
-  _globals['_LANGUAGE']._serialized_start=6059
-  _globals['_LANGUAGE']._serialized_end=6232
-  _globals['_QUERYOFFSETPOLICY']._serialized_start=6234
-  _globals['_QUERYOFFSETPOLICY']._serialized_end=6292
+  _globals['_TRANSACTIONRESOLUTION']._serialized_start=4107
+  _globals['_TRANSACTIONRESOLUTION']._serialized_end=4196
+  _globals['_TRANSACTIONSOURCE']._serialized_start=4198
+  _globals['_TRANSACTIONSOURCE']._serialized_end=4285
+  _globals['_PERMISSION']._serialized_start=4287
+  _globals['_PERMISSION']._serialized_end=4374
+  _globals['_FILTERTYPE']._serialized_start=4376
+  _globals['_FILTERTYPE']._serialized_end=4435
+  _globals['_ADDRESSSCHEME']._serialized_start=4437
+  _globals['_ADDRESSSCHEME']._serialized_end=4521
+  _globals['_MESSAGETYPE']._serialized_start=4523
+  _globals['_MESSAGETYPE']._serialized_end=4626
+  _globals['_DIGESTTYPE']._serialized_start=4628
+  _globals['_DIGESTTYPE']._serialized_end=4699
+  _globals['_CLIENTTYPE']._serialized_start=4702
+  _globals['_CLIENTTYPE']._serialized_end=4866
+  _globals['_ENCODING']._serialized_start=4868
+  _globals['_ENCODING']._serialized_end=4928
+  _globals['_CODE']._serialized_start=4931
+  _globals['_CODE']._serialized_end=6380
+  _globals['_LANGUAGE']._serialized_start=6383
+  _globals['_LANGUAGE']._serialized_end=6556
+  _globals['_LITESUBSCRIPTIONACTION']._serialized_start=6558
+  _globals['_LITESUBSCRIPTIONACTION']._serialized_end=6658
+  _globals['_QUERYOFFSETPOLICY']._serialized_start=6660
+  _globals['_QUERYOFFSETPOLICY']._serialized_end=6718
   _globals['_FILTEREXPRESSION']._serialized_start=105
   _globals['_FILTEREXPRESSION']._serialized_end=189
   _globals['_RETRYPOLICY']._serialized_start=192
@@ -83,25 +85,25 @@ if not _descriptor._USE_C_DESCRIPTORS:
   _globals['_DIGEST']._serialized_start=1217
   _globals['_DIGEST']._serialized_end=1289
   _globals['_SYSTEMPROPERTIES']._serialized_start=1292
-  _globals['_SYSTEMPROPERTIES']._serialized_end=2331
-  _globals['_DEADLETTERQUEUE']._serialized_start=2333
-  _globals['_DEADLETTERQUEUE']._serialized_end=2385
-  _globals['_MESSAGE']._serialized_start=2388
-  _globals['_MESSAGE']._serialized_end=2650
-  _globals['_MESSAGE_USERPROPERTIESENTRY']._serialized_start=2597
-  _globals['_MESSAGE_USERPROPERTIESENTRY']._serialized_end=2650
-  _globals['_ASSIGNMENT']._serialized_start=2652
-  _globals['_ASSIGNMENT']._serialized_end=2721
-  _globals['_STATUS']._serialized_start=2723
-  _globals['_STATUS']._serialized_end=2788
-  _globals['_UA']._serialized_start=2790
-  _globals['_UA']._serialized_end=2895
-  _globals['_SETTINGS']._serialized_start=2898
-  _globals['_SETTINGS']._serialized_end=3426
-  _globals['_PUBLISHING']._serialized_start=3428
-  _globals['_PUBLISHING']._serialized_end=3540
-  _globals['_SUBSCRIPTION']._serialized_start=3543
-  _globals['_SUBSCRIPTION']._serialized_end=3850
-  _globals['_METRIC']._serialized_start=3852
-  _globals['_METRIC']._serialized_end=3941
+  _globals['_SYSTEMPROPERTIES']._serialized_end=2371
+  _globals['_DEADLETTERQUEUE']._serialized_start=2373
+  _globals['_DEADLETTERQUEUE']._serialized_end=2425
+  _globals['_MESSAGE']._serialized_start=2428
+  _globals['_MESSAGE']._serialized_end=2690
+  _globals['_MESSAGE_USERPROPERTIESENTRY']._serialized_start=2637
+  _globals['_MESSAGE_USERPROPERTIESENTRY']._serialized_end=2690
+  _globals['_ASSIGNMENT']._serialized_start=2692
+  _globals['_ASSIGNMENT']._serialized_end=2761
+  _globals['_STATUS']._serialized_start=2763
+  _globals['_STATUS']._serialized_end=2828
+  _globals['_UA']._serialized_start=2830
+  _globals['_UA']._serialized_end=2935
+  _globals['_SETTINGS']._serialized_start=2938
+  _globals['_SETTINGS']._serialized_end=3466
+  _globals['_PUBLISHING']._serialized_start=3468
+  _globals['_PUBLISHING']._serialized_end=3580
+  _globals['_SUBSCRIPTION']._serialized_start=3583
+  _globals['_SUBSCRIPTION']._serialized_end=4014
+  _globals['_METRIC']._serialized_start=4016
+  _globals['_METRIC']._serialized_end=4105
 # @@protoc_insertion_point(module_scope)
diff --git a/python/rocketmq/grpc_protocol/proto/definition.proto 
b/python/rocketmq/grpc_protocol/proto/definition.proto
index 468c4105..2513bac0 100644
--- a/python/rocketmq/grpc_protocol/proto/definition.proto
+++ b/python/rocketmq/grpc_protocol/proto/definition.proto
@@ -146,6 +146,9 @@ enum MessageType {
   // Messages that are transactional. Only committed messages are delivered to
   // subscribers.
   TRANSACTION = 4;
+
+  // lite topic
+  LITE = 5;
 }
 
 enum DigestType {
@@ -186,6 +189,8 @@ enum ClientType {
   PUSH_CONSUMER = 2;
   SIMPLE_CONSUMER = 3;
   PULL_CONSUMER = 4;
+  LITE_PUSH_CONSUMER = 5;
+  LITE_SIMPLE_CONSUMER = 6;
 }
 
 enum Encoding {
@@ -270,6 +275,9 @@ message SystemProperties {
 
   // Information to identify whether this message is from dead letter queue.
   optional DeadLetterQueue dead_letter_queue = 20;
+
+  // lite topic
+  optional string lite_topic = 21;
 }
 
 message DeadLetterQueue {
@@ -348,6 +356,8 @@ enum Code {
   ILLEGAL_POLLING_TIME = 40018;
   // Offset is illegal.
   ILLEGAL_OFFSET = 40019;
+  // Format of lite topic is illegal.
+  ILLEGAL_LITE_TOPIC = 40020;
 
   // Generic code indicates that the client request lacks valid authentication
   // credentials for the requested resource.
@@ -389,6 +399,10 @@ enum Code {
   // Requests are throttled.
   TOO_MANY_REQUESTS = 42900;
 
+  // LiteTopic related quota exceeded
+  LITE_TOPIC_QUOTA_EXCEEDED = 42901;
+  LITE_SUBSCRIPTION_QUOTA_EXCEEDED = 42902;
+
   // Generic code for the case that the server is unwilling to process the 
request because its header fields are too large.
   // The request may be resubmitted after reducing the size of the request 
header fields.
   REQUEST_HEADER_FIELDS_TOO_LARGE = 43100;
@@ -548,6 +562,21 @@ message Subscription {
   // Long-polling timeout for `ReceiveMessageRequest`, which is essential for
   // push consumer.
   optional google.protobuf.Duration long_polling_timeout = 5;
+
+  // Only lite push consumer
+  // client-side lite subscription quota limit
+  optional int32 lite_subscription_quota = 6;
+
+  // Only lite push consumer
+  // Maximum length limit for lite topic
+  optional int32 max_lite_topic_size = 7;
+}
+
+enum LiteSubscriptionAction {
+  PARTIAL_ADD = 0;
+  PARTIAL_REMOVE = 1;
+  COMPLETE_ADD = 2;
+  COMPLETE_REMOVE = 3;
 }
 
 message Metric {
diff --git a/python/rocketmq/grpc_protocol/proto/service.proto 
b/python/rocketmq/grpc_protocol/proto/service.proto
index e5497f2d..ca809ff2 100644
--- a/python/rocketmq/grpc_protocol/proto/service.proto
+++ b/python/rocketmq/grpc_protocol/proto/service.proto
@@ -114,6 +114,7 @@ message ReceiveMessageResponse {
 message AckMessageEntry {
   string message_id = 1;
   string receipt_handle = 2;
+  optional string lite_topic = 3;
 }
 
 message AckMessageRequest {
@@ -148,6 +149,7 @@ message ForwardMessageToDeadLetterQueueRequest {
   string message_id = 4;
   int32 delivery_attempt = 5;
   int32 max_delivery_attempts = 6;
+  optional string lite_topic = 7;
 }
 
 message ForwardMessageToDeadLetterQueueResponse { Status status = 1; }
@@ -193,6 +195,10 @@ message RecoverOrphanedTransactionCommand {
   string transaction_id = 2;
 }
 
+message NotifyUnsubscribeLiteCommand {
+  string lite_topic = 1;
+}
+
 message TelemetryCommand {
   optional Status status = 1;
 
@@ -221,6 +227,9 @@ message TelemetryCommand {
 
     // Request client to reconnect server use the latest endpoints.
     ReconnectEndpointsCommand reconnect_endpoints_command = 8;
+
+    // Request client to unsubscribe lite topic.
+    NotifyUnsubscribeLiteCommand notify_unsubscribe_lite_command = 9;
   }
 }
 
@@ -311,6 +320,21 @@ message RecallMessageResponse {
   string message_id = 2;
 }
 
+message SyncLiteSubscriptionRequest {
+  LiteSubscriptionAction action = 1;
+  // bindTopic for lite push consumer
+  Resource topic = 2;
+  // consumer group
+  Resource group = 3;
+  // lite subscription set of lite topics
+  repeated string lite_topic_set = 4;
+  optional int64 version = 5;
+}
+
+message SyncLiteSubscriptionResponse {
+  Status status = 1;
+}
+
 // For all the RPCs in MessagingService, the following error handling policies
 // apply:
 //
@@ -440,4 +464,8 @@ service MessagingService {
   // for normal message, not supported for now.
   rpc RecallMessage(RecallMessageRequest) returns (RecallMessageResponse) {
   }
+
+  // Sync lite subscription info, lite push consumer only
+  rpc SyncLiteSubscription(SyncLiteSubscriptionRequest) returns 
(SyncLiteSubscriptionResponse) {}
+
 }
diff --git a/python/rocketmq/grpc_protocol/service_pb2.py 
b/python/rocketmq/grpc_protocol/service_pb2.py
index bf080be2..718fae85 100644
--- a/python/rocketmq/grpc_protocol/service_pb2.py
+++ b/python/rocketmq/grpc_protocol/service_pb2.py
@@ -26,8 +26,7 @@ from google.protobuf import duration_pb2 as 
google_dot_protobuf_dot_duration__pb
 from google.protobuf import timestamp_pb2 as 
google_dot_protobuf_dot_timestamp__pb2
 from rocketmq.grpc_protocol import definition_pb2 as definition__pb2
 
-
-DESCRIPTOR = 
_descriptor_pool.Default().AddSerializedFile(b'\n\rservice.proto\x12\x12\x61pache.rocketmq.v2\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x10\x64\x65\x66inition.proto\"r\n\x11QueryRouteRequest\x12+\n\x05topic\x18\x01
 \x01(\x0b\x32\x1c.apache.rocketmq.v2.Resource\x12\x30\n\tendpoints\x18\x02 
\x01(\x0b\x32\x1d.apache.rocketmq.v2.Endpoints\"z\n\x12QueryRouteResponse\x12*\n\x06status\x18\x01
 \x01(\x0b\x32\x1a.apache.rocketmq.v2.Status\x12\x38 [...]
+DESCRIPTOR = 
_descriptor_pool.Default().AddSerializedFile(b'\n\rservice.proto\x12\x12\x61pache.rocketmq.v2\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x10\x64\x65\x66inition.proto\"r\n\x11QueryRouteRequest\x12+\n\x05topic\x18\x01
 \x01(\x0b\x32\x1c.apache.rocketmq.v2.Resource\x12\x30\n\tendpoints\x18\x02 
\x01(\x0b\x32\x1d.apache.rocketmq.v2.Endpoints\"z\n\x12QueryRouteResponse\x12*\n\x06status\x18\x01
 \x01(\x0b\x32\x1a.apache.rocketmq.v2.Status\x12\x38 [...]
 
 _globals = globals()
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -54,67 +53,73 @@ if not _descriptor._USE_C_DESCRIPTORS:
   _globals['_RECEIVEMESSAGERESPONSE']._serialized_start=1432
   _globals['_RECEIVEMESSAGERESPONSE']._serialized_end=1619
   _globals['_ACKMESSAGEENTRY']._serialized_start=1621
-  _globals['_ACKMESSAGEENTRY']._serialized_end=1682
-  _globals['_ACKMESSAGEREQUEST']._serialized_start=1685
-  _globals['_ACKMESSAGEREQUEST']._serialized_end=1848
-  _globals['_ACKMESSAGERESULTENTRY']._serialized_start=1850
-  _globals['_ACKMESSAGERESULTENTRY']._serialized_end=1961
-  _globals['_ACKMESSAGERESPONSE']._serialized_start=1963
-  _globals['_ACKMESSAGERESPONSE']._serialized_end=2087
-  _globals['_FORWARDMESSAGETODEADLETTERQUEUEREQUEST']._serialized_start=2090
-  _globals['_FORWARDMESSAGETODEADLETTERQUEUEREQUEST']._serialized_end=2321
-  _globals['_FORWARDMESSAGETODEADLETTERQUEUERESPONSE']._serialized_start=2323
-  _globals['_FORWARDMESSAGETODEADLETTERQUEUERESPONSE']._serialized_end=2408
-  _globals['_HEARTBEATREQUEST']._serialized_start=2411
-  _globals['_HEARTBEATREQUEST']._serialized_end=2542
-  _globals['_HEARTBEATRESPONSE']._serialized_start=2544
-  _globals['_HEARTBEATRESPONSE']._serialized_end=2607
-  _globals['_ENDTRANSACTIONREQUEST']._serialized_start=2610
-  _globals['_ENDTRANSACTIONREQUEST']._serialized_end=2863
-  _globals['_ENDTRANSACTIONRESPONSE']._serialized_start=2865
-  _globals['_ENDTRANSACTIONRESPONSE']._serialized_end=2933
-  _globals['_PRINTTHREADSTACKTRACECOMMAND']._serialized_start=2935
-  _globals['_PRINTTHREADSTACKTRACECOMMAND']._serialized_end=2980
-  _globals['_RECONNECTENDPOINTSCOMMAND']._serialized_start=2982
-  _globals['_RECONNECTENDPOINTSCOMMAND']._serialized_end=3024
-  _globals['_THREADSTACKTRACE']._serialized_start=3026
-  _globals['_THREADSTACKTRACE']._serialized_end=3115
-  _globals['_VERIFYMESSAGECOMMAND']._serialized_start=3117
-  _globals['_VERIFYMESSAGECOMMAND']._serialized_end=3200
-  _globals['_VERIFYMESSAGERESULT']._serialized_start=3202
-  _globals['_VERIFYMESSAGERESULT']._serialized_end=3238
-  _globals['_RECOVERORPHANEDTRANSACTIONCOMMAND']._serialized_start=3240
-  _globals['_RECOVERORPHANEDTRANSACTIONCOMMAND']._serialized_end=3345
-  _globals['_TELEMETRYCOMMAND']._serialized_start=3348
-  _globals['_TELEMETRYCOMMAND']._serialized_end=3988
-  _globals['_NOTIFYCLIENTTERMINATIONREQUEST']._serialized_start=3990
-  _globals['_NOTIFYCLIENTTERMINATIONREQUEST']._serialized_end=4082
-  _globals['_NOTIFYCLIENTTERMINATIONRESPONSE']._serialized_start=4084
-  _globals['_NOTIFYCLIENTTERMINATIONRESPONSE']._serialized_end=4161
-  _globals['_CHANGEINVISIBLEDURATIONREQUEST']._serialized_start=4164
-  _globals['_CHANGEINVISIBLEDURATIONREQUEST']._serialized_end=4385
-  _globals['_CHANGEINVISIBLEDURATIONRESPONSE']._serialized_start=4387
-  _globals['_CHANGEINVISIBLEDURATIONRESPONSE']._serialized_end=4488
-  _globals['_PULLMESSAGEREQUEST']._serialized_start=4491
-  _globals['_PULLMESSAGEREQUEST']._serialized_end=4771
-  _globals['_PULLMESSAGERESPONSE']._serialized_start=4774
-  _globals['_PULLMESSAGERESPONSE']._serialized_end=4923
-  _globals['_UPDATEOFFSETREQUEST']._serialized_start=4926
-  _globals['_UPDATEOFFSETREQUEST']._serialized_end=5065
-  _globals['_UPDATEOFFSETRESPONSE']._serialized_start=5067
-  _globals['_UPDATEOFFSETRESPONSE']._serialized_end=5133
-  _globals['_GETOFFSETREQUEST']._serialized_start=5135
-  _globals['_GETOFFSETREQUEST']._serialized_end=5255
-  _globals['_GETOFFSETRESPONSE']._serialized_start=5257
-  _globals['_GETOFFSETRESPONSE']._serialized_end=5336
-  _globals['_QUERYOFFSETREQUEST']._serialized_start=5339
-  _globals['_QUERYOFFSETREQUEST']._serialized_end=5550
-  _globals['_QUERYOFFSETRESPONSE']._serialized_start=5552
-  _globals['_QUERYOFFSETRESPONSE']._serialized_end=5633
-  _globals['_RECALLMESSAGEREQUEST']._serialized_start=5635
-  _globals['_RECALLMESSAGEREQUEST']._serialized_end=5725
-  _globals['_RECALLMESSAGERESPONSE']._serialized_start=5727
-  _globals['_RECALLMESSAGERESPONSE']._serialized_end=5814
-  _globals['_MESSAGINGSERVICE']._serialized_start=5817
-  _globals['_MESSAGINGSERVICE']._serialized_end=7560
+  _globals['_ACKMESSAGEENTRY']._serialized_end=1722
+  _globals['_ACKMESSAGEREQUEST']._serialized_start=1725
+  _globals['_ACKMESSAGEREQUEST']._serialized_end=1888
+  _globals['_ACKMESSAGERESULTENTRY']._serialized_start=1890
+  _globals['_ACKMESSAGERESULTENTRY']._serialized_end=2001
+  _globals['_ACKMESSAGERESPONSE']._serialized_start=2003
+  _globals['_ACKMESSAGERESPONSE']._serialized_end=2127
+  _globals['_FORWARDMESSAGETODEADLETTERQUEUEREQUEST']._serialized_start=2130
+  _globals['_FORWARDMESSAGETODEADLETTERQUEUEREQUEST']._serialized_end=2401
+  _globals['_FORWARDMESSAGETODEADLETTERQUEUERESPONSE']._serialized_start=2403
+  _globals['_FORWARDMESSAGETODEADLETTERQUEUERESPONSE']._serialized_end=2488
+  _globals['_HEARTBEATREQUEST']._serialized_start=2491
+  _globals['_HEARTBEATREQUEST']._serialized_end=2622
+  _globals['_HEARTBEATRESPONSE']._serialized_start=2624
+  _globals['_HEARTBEATRESPONSE']._serialized_end=2687
+  _globals['_ENDTRANSACTIONREQUEST']._serialized_start=2690
+  _globals['_ENDTRANSACTIONREQUEST']._serialized_end=2943
+  _globals['_ENDTRANSACTIONRESPONSE']._serialized_start=2945
+  _globals['_ENDTRANSACTIONRESPONSE']._serialized_end=3013
+  _globals['_PRINTTHREADSTACKTRACECOMMAND']._serialized_start=3015
+  _globals['_PRINTTHREADSTACKTRACECOMMAND']._serialized_end=3060
+  _globals['_RECONNECTENDPOINTSCOMMAND']._serialized_start=3062
+  _globals['_RECONNECTENDPOINTSCOMMAND']._serialized_end=3104
+  _globals['_THREADSTACKTRACE']._serialized_start=3106
+  _globals['_THREADSTACKTRACE']._serialized_end=3195
+  _globals['_VERIFYMESSAGECOMMAND']._serialized_start=3197
+  _globals['_VERIFYMESSAGECOMMAND']._serialized_end=3280
+  _globals['_VERIFYMESSAGERESULT']._serialized_start=3282
+  _globals['_VERIFYMESSAGERESULT']._serialized_end=3318
+  _globals['_RECOVERORPHANEDTRANSACTIONCOMMAND']._serialized_start=3320
+  _globals['_RECOVERORPHANEDTRANSACTIONCOMMAND']._serialized_end=3425
+  _globals['_NOTIFYUNSUBSCRIBELITECOMMAND']._serialized_start=3427
+  _globals['_NOTIFYUNSUBSCRIBELITECOMMAND']._serialized_end=3477
+  _globals['_TELEMETRYCOMMAND']._serialized_start=3480
+  _globals['_TELEMETRYCOMMAND']._serialized_end=4213
+  _globals['_NOTIFYCLIENTTERMINATIONREQUEST']._serialized_start=4215
+  _globals['_NOTIFYCLIENTTERMINATIONREQUEST']._serialized_end=4307
+  _globals['_NOTIFYCLIENTTERMINATIONRESPONSE']._serialized_start=4309
+  _globals['_NOTIFYCLIENTTERMINATIONRESPONSE']._serialized_end=4386
+  _globals['_CHANGEINVISIBLEDURATIONREQUEST']._serialized_start=4389
+  _globals['_CHANGEINVISIBLEDURATIONREQUEST']._serialized_end=4610
+  _globals['_CHANGEINVISIBLEDURATIONRESPONSE']._serialized_start=4612
+  _globals['_CHANGEINVISIBLEDURATIONRESPONSE']._serialized_end=4713
+  _globals['_PULLMESSAGEREQUEST']._serialized_start=4716
+  _globals['_PULLMESSAGEREQUEST']._serialized_end=4996
+  _globals['_PULLMESSAGERESPONSE']._serialized_start=4999
+  _globals['_PULLMESSAGERESPONSE']._serialized_end=5148
+  _globals['_UPDATEOFFSETREQUEST']._serialized_start=5151
+  _globals['_UPDATEOFFSETREQUEST']._serialized_end=5290
+  _globals['_UPDATEOFFSETRESPONSE']._serialized_start=5292
+  _globals['_UPDATEOFFSETRESPONSE']._serialized_end=5358
+  _globals['_GETOFFSETREQUEST']._serialized_start=5360
+  _globals['_GETOFFSETREQUEST']._serialized_end=5480
+  _globals['_GETOFFSETRESPONSE']._serialized_start=5482
+  _globals['_GETOFFSETRESPONSE']._serialized_end=5561
+  _globals['_QUERYOFFSETREQUEST']._serialized_start=5564
+  _globals['_QUERYOFFSETREQUEST']._serialized_end=5775
+  _globals['_QUERYOFFSETRESPONSE']._serialized_start=5777
+  _globals['_QUERYOFFSETRESPONSE']._serialized_end=5858
+  _globals['_RECALLMESSAGEREQUEST']._serialized_start=5860
+  _globals['_RECALLMESSAGEREQUEST']._serialized_end=5950
+  _globals['_RECALLMESSAGERESPONSE']._serialized_start=5952
+  _globals['_RECALLMESSAGERESPONSE']._serialized_end=6039
+  _globals['_SYNCLITESUBSCRIPTIONREQUEST']._serialized_start=6042
+  _globals['_SYNCLITESUBSCRIPTIONREQUEST']._serialized_end=6279
+  _globals['_SYNCLITESUBSCRIPTIONRESPONSE']._serialized_start=6281
+  _globals['_SYNCLITESUBSCRIPTIONRESPONSE']._serialized_end=6355
+  _globals['_MESSAGINGSERVICE']._serialized_start=6358
+  _globals['_MESSAGINGSERVICE']._serialized_end=8226
 # @@protoc_insertion_point(module_scope)
diff --git a/python/rocketmq/grpc_protocol/service_pb2_grpc.py 
b/python/rocketmq/grpc_protocol/service_pb2_grpc.py
index b04a190d..091cad16 100644
--- a/python/rocketmq/grpc_protocol/service_pb2_grpc.py
+++ b/python/rocketmq/grpc_protocol/service_pb2_grpc.py
@@ -117,6 +117,11 @@ class MessagingServiceStub(object):
                 
request_serializer=service__pb2.RecallMessageRequest.SerializeToString,
                 
response_deserializer=service__pb2.RecallMessageResponse.FromString,
                 )
+        self.SyncLiteSubscription = channel.unary_unary(
+                '/apache.rocketmq.v2.MessagingService/SyncLiteSubscription',
+                
request_serializer=service__pb2.SyncLiteSubscriptionRequest.SerializeToString,
+                
response_deserializer=service__pb2.SyncLiteSubscriptionResponse.FromString,
+                )
 
 
 class MessagingServiceServicer(object):
@@ -308,6 +313,13 @@ class MessagingServiceServicer(object):
         context.set_details('Method not implemented!')
         raise NotImplementedError('Method not implemented!')
 
+    def SyncLiteSubscription(self, request, context):
+        """Sync lite subscription info, lite push consumer only
+        """
+        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+        context.set_details('Method not implemented!')
+        raise NotImplementedError('Method not implemented!')
+
 
 def add_MessagingServiceServicer_to_server(servicer, server):
     rpc_method_handlers = {
@@ -391,6 +403,11 @@ def add_MessagingServiceServicer_to_server(servicer, 
server):
                     
request_deserializer=service__pb2.RecallMessageRequest.FromString,
                     
response_serializer=service__pb2.RecallMessageResponse.SerializeToString,
             ),
+            'SyncLiteSubscription': grpc.unary_unary_rpc_method_handler(
+                    servicer.SyncLiteSubscription,
+                    
request_deserializer=service__pb2.SyncLiteSubscriptionRequest.FromString,
+                    
response_serializer=service__pb2.SyncLiteSubscriptionResponse.SerializeToString,
+            ),
     }
     generic_handler = grpc.method_handlers_generic_handler(
             'apache.rocketmq.v2.MessagingService', rpc_method_handlers)
@@ -682,3 +699,20 @@ class MessagingService(object):
             service__pb2.RecallMessageResponse.FromString,
             options, channel_credentials,
             insecure, call_credentials, compression, wait_for_ready, timeout, 
metadata)
+
+    @staticmethod
+    def SyncLiteSubscription(request,
+            target,
+            options=(),
+            channel_credentials=None,
+            call_credentials=None,
+            insecure=False,
+            compression=None,
+            wait_for_ready=None,
+            timeout=None,
+            metadata=None):
+        return grpc.experimental.unary_unary(request, target, 
'/apache.rocketmq.v2.MessagingService/SyncLiteSubscription',
+            service__pb2.SyncLiteSubscriptionRequest.SerializeToString,
+            service__pb2.SyncLiteSubscriptionResponse.FromString,
+            options, channel_credentials,
+            insecure, call_credentials, compression, wait_for_ready, timeout, 
metadata)
diff --git a/python/rocketmq/v5/client/client.py 
b/python/rocketmq/v5/client/client.py
index 7ee51495..05f01625 100644
--- a/python/rocketmq/v5/client/client.py
+++ b/python/rocketmq/v5/client/client.py
@@ -60,6 +60,7 @@ class Client:
         self.__client_callback_executor = None
         self.__is_running = False
         self.__had_shutdown = False
+        self._init_settings_event = threading.Event()
 
     def startup(self):
         try:
@@ -71,12 +72,15 @@ class Client:
             self._pre_start()
             try:
                 # pre update topic route for producer or consumer
-                for topic in self.__topics:
-                    self.__update_topic_route(topic)
+                if self.__topics:
+                    for topic in self.__topics:
+                        if not self.__update_topic_route(topic):
+                            raise Exception("update topic raise exception when 
client startup")
+                    self._init_settings_event.wait()
             except Exception as e:
                 # ignore this exception and retrieve again when calling send 
or receive
                 logger.warn(
-                    f"update topic exception when client startup, ignore it, 
try it again in scheduler. exception: {e}"
+                    f"update topic raise exception when client startup, ignore 
it, try it again in scheduler. exception: {e}"
                 )
             self.__start_scheduler()
             self.__start_async_rpc_callback_executor()
@@ -104,6 +108,7 @@ class Client:
             self.__rpc_client.stop()
             self.__topic_route_cache.clear()
             self.__topics.clear()
+            self._init_settings_event = None
             self.__had_shutdown = True
             self.__is_running = False
         except Exception as e:
@@ -239,11 +244,13 @@ class Client:
 
     def _retrieve_topic_route_data(self, topic):
         route = self.__topic_route_cache.get(topic)
-        if route is not None:
+        if route:
+            if topic not in self.__topics:
+                self.__topics.add(topic)
             return route
         else:
             route = self.__update_topic_route(topic)
-            if route is not None:
+            if route:
                 logger.info(f"{self} update topic:{topic} route success.")
                 self.__topics.add(topic)
                 return route
@@ -299,9 +306,9 @@ class Client:
             res = future.result()
             self.__handle_topic_route_res(res, topic)
         except Exception as e:
-            raise e
+            logger.error(f"query topic raise exception, {e}")
         finally:
-            if event is not None:
+            if event:
                 event.set()
 
     def __topic_route_req(self, topic):
@@ -312,7 +319,7 @@ class Client:
         return req
 
     def __handle_topic_route_res(self, res, topic):
-        if res is not None:
+        if res:
             MessagingResultChecker.check(res.status)
             if res.status.code == Code.OK:
                 topic_route = TopicRouteData(res.message_queues)
@@ -343,12 +350,12 @@ class Client:
     def __heartbeat_callback(self, future, endpoints):
         try:
             res = future.result()
-            if res is not None and res.status.code == Code.OK:
+            if res and res.status.code == Code.OK:
                 logger.info(
                     f"{self} send heartbeat to {endpoints} success."
                 )
             else:
-                if res is not None:
+                if res:
                     logger.error(
                         f"{self} send heartbeat to {endpoints} error, 
code:{res.status.code}, message:{res.status.message}."
                     )
@@ -469,6 +476,7 @@ class Client:
         if self.__client_callback_executor:
             self.__client_callback_executor.shutdown()
             self.__client_callback_executor = None
+            logger.info("stop client callback executor.")
 
     """ property """
 
diff --git a/python/rocketmq/v5/client/connection/rpc_channel.py 
b/python/rocketmq/v5/client/connection/rpc_channel.py
index 0842bc5e..6ca598e6 100644
--- a/python/rocketmq/v5/client/connection/rpc_channel.py
+++ b/python/rocketmq/v5/client/connection/rpc_channel.py
@@ -129,21 +129,18 @@ class RpcStreamStreamCall:
                     res = await self.__stream_stream_call.read()
                     if res.HasField("settings"):
                         # read a response for send setting result
-                        if res is not None and res.status.code == Code.OK:
+                        if res and res.status.code == Code.OK:
                             logger.debug(
                                 f"{ self.__handler} sync setting success. 
response status code: {res.status.code}"
                             )
-                            if (
-                                res.settings is not None
-                                and res.settings.metric is not None
-                            ):
+                            if res.settings and res.settings.metric:
                                 # reset metrics if needed
                                 
self.__handler.reset_metric(res.settings.metric)
                                 # sync setting
                                 self.__handler.reset_setting(res.settings)
                     elif res.HasField("recover_orphaned_transaction_command"):
                         # sever check for a transaction message
-                        if self.__handler is not None:
+                        if self.__handler:
                             transaction_id = (
                                 
res.recover_orphaned_transaction_command.transaction_id
                             )
@@ -161,14 +158,14 @@ class RpcStreamStreamCall:
                 )
 
     async def stream_write(self, req):
-        if self.__stream_stream_call is not None:
+        if self.__stream_stream_call:
             try:
                 await self.__stream_stream_call.write(req)
             except Exception as e:
                 raise e
 
     def close(self):
-        if self.__stream_stream_call is not None:
+        if self.__stream_stream_call:
             self.__stream_stream_call.cancel()
 
 
@@ -189,9 +186,9 @@ class RpcChannel:
         self.__create_aio_channel()
 
     def close_channel(self, loop):
-        if self.__async_channel is not None:
+        if self.__async_channel:
             # close stream_stream_call
-            if self.__telemetry_stream_stream_call is not None:
+            if self.__telemetry_stream_stream_call:
                 self.__telemetry_stream_stream_call.close()
                 self.__telemetry_stream_stream_call = None
                 logger.info(
@@ -220,7 +217,7 @@ class RpcChannel:
 
     def __create_aio_channel(self):
         try:
-            if self.__endpoints is None:
+            if not self.__endpoints:
                 raise IllegalArgumentException(
                     "create_aio_channel exception, endpoints is None"
                 )
diff --git a/python/rocketmq/v5/client/connection/rpc_client.py 
b/python/rocketmq/v5/client/connection/rpc_client.py
index c5c5f3ec..21237f94 100644
--- a/python/rocketmq/v5/client/connection/rpc_client.py
+++ b/python/rocketmq/v5/client/connection/rpc_client.py
@@ -28,6 +28,7 @@ from rocketmq.grpc_protocol import (AckMessageRequest,
                                     QueryAssignmentRequest, QueryRouteRequest,
                                     RecallMessageRequest,
                                     ReceiveMessageRequest, SendMessageRequest,
+                                    SyncLiteSubscriptionRequest,
                                     TelemetryCommand)
 from rocketmq.v5.client.connection import RpcChannel, RpcEndpoints
 from rocketmq.v5.log import logger
@@ -193,12 +194,14 @@ class RpcClient:
         return RpcClient.__run_message_service_async(
             self.__forward_message_to_dead_letter_queue_async_0(endpoints, 
req, metadata=metadata, timeout=timeout))
 
-    """ build stream_stream_call """
+    def sync_lite_subscription_async(self, endpoints: RpcEndpoints, req: 
SyncLiteSubscriptionRequest, metadata, timeout=3):
+        return RpcClient.__run_message_service_async(
+            self.__sync_lite_subscription_0(endpoints, req, metadata=metadata, 
timeout=timeout))
 
     def telemetry_stream(
         self, endpoints: RpcEndpoints, client, metadata, rebuild, timeout=3000
     ):
-        # assert asyncio.get_running_loop() == RpcClient._io_loop
+        # build grpc stream_stream_call
         try:
             channel = self.retrieve_or_create_channel(endpoints)
             stream = channel.async_stub.Telemetry(
@@ -298,6 +301,10 @@ class RpcClient:
         return await 
self.retrieve_or_create_channel(endpoints).async_stub.ForwardMessageToDeadLetterQueue(req,
 metadata=metadata,
                                                                                
                            timeout=timeout)
 
+    async def __sync_lite_subscription_0(self, endpoints: RpcEndpoints,
+                                         req: SyncLiteSubscriptionRequest, 
metadata, timeout=3):
+        return await 
self.retrieve_or_create_channel(endpoints).async_stub.SyncLiteSubscription(req, 
metadata=metadata, timeout=timeout)
+
     async def __create_channel_async(self, endpoints: RpcEndpoints):
         return self.retrieve_or_create_channel(endpoints)
 
diff --git a/python/rocketmq/v5/consumer/__init__.py 
b/python/rocketmq/v5/consumer/__init__.py
index b6a38b3b..daaf81c6 100644
--- a/python/rocketmq/v5/consumer/__init__.py
+++ b/python/rocketmq/v5/consumer/__init__.py
@@ -13,13 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from .message_listener import ConsumeResult, MessageListener
-from .push_consumer import PushConsumer
-from .simple_consumer import SimpleConsumer
+from .push import (ConsumeResult, LitePushConsumer, MessageListener,
+                   PushConsumer)
+from .simple import SimpleConsumer
 
 __all__ = [
     "SimpleConsumer",
     "PushConsumer",
+    "LitePushConsumer",
     "MessageListener",
     "ConsumeResult",
 ]
diff --git a/python/rocketmq/v5/consumer/consumer.py 
b/python/rocketmq/v5/consumer/consumer.py
index 8be58ae3..dfcd46e5 100644
--- a/python/rocketmq/v5/consumer/consumer.py
+++ b/python/rocketmq/v5/consumer/consumer.py
@@ -57,7 +57,7 @@ class Consumer(Client):
         self._consumer_group = consumer_group
         # <String /* topic */, FilterExpression>
         self._subscriptions = ConcurrentMap()
-        if subscription is not None:
+        if subscription:
             self._subscriptions.update(subscription)
 
     def subscribe(self, topic, filter_expression: FilterExpression = None):
@@ -94,14 +94,16 @@ class Consumer(Client):
             self._subscriptions.remove(topic)
             self._remove_unused_topic_route_data(topic)
 
-    def ack(self, message: Message):
+    """ protect """
+
+    def _ack(self, message: Message):
         try:
             future = self.__ack(message)
             self.__handle_ack_result(future)
         except Exception as e:
             raise e
 
-    def ack_async(self, message: Message):
+    def _ack_async(self, message: Message):
         try:
             future = self.__ack(message)
             ret_future = Future()
@@ -113,14 +115,14 @@ class Consumer(Client):
         except Exception as e:
             raise e
 
-    def change_invisible_duration(self, message: Message, invisible_duration):
+    def _change_invisible_duration(self, message: Message, invisible_duration):
         try:
             future = self.__change_invisible_duration(message, 
invisible_duration)
             self.__handle_change_invisible_result(future, message)
         except Exception as e:
             raise e
 
-    def change_invisible_duration_async(self, message: Message, 
invisible_duration):
+    def _change_invisible_duration_async(self, message: Message, 
invisible_duration):
         try:
             future = self.__change_invisible_duration(message, 
invisible_duration)
             ret_future = Future()
@@ -132,8 +134,6 @@ class Consumer(Client):
         except Exception as e:
             raise e
 
-    """ protect """
-
     def _receive(self, queue, req, timeout):
         try:
             receive_future = self.rpc_client.receive_message_async(
@@ -170,8 +170,9 @@ class Consumer(Client):
         req.group.name = self._consumer_group
         req.group.resource_namespace = self.client_configuration.namespace
         req.message_queue.CopyFrom(queue.message_queue0())
-        req.filter_expression.type = filter_expression.filter_type
-        req.filter_expression.expression = filter_expression.expression
+        if filter_expression:
+            req.filter_expression.type = filter_expression.filter_type
+            req.filter_expression.expression = filter_expression.expression
         req.batch_size = max_message_num
         if invisible_duration:
             req.invisible_duration.seconds = invisible_duration
@@ -250,6 +251,8 @@ class Consumer(Client):
         msg_entry = AckMessageEntry()
         msg_entry.message_id = message.message_id
         msg_entry.receipt_handle = message.receipt_handle
+        if self.client_type == ClientType.LITE_PUSH_CONSUMER:
+            msg_entry.lite_topic = message.lite_topic
         req.entries.append(msg_entry)
         return req
 
diff --git a/python/rocketmq/v5/consumer/__init__.py 
b/python/rocketmq/v5/consumer/push/__init__.py
similarity index 92%
copy from python/rocketmq/v5/consumer/__init__.py
copy to python/rocketmq/v5/consumer/push/__init__.py
index b6a38b3b..3afabd9b 100644
--- a/python/rocketmq/v5/consumer/__init__.py
+++ b/python/rocketmq/v5/consumer/push/__init__.py
@@ -13,13 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from .lite_push_consumer import LitePushConsumer
 from .message_listener import ConsumeResult, MessageListener
 from .push_consumer import PushConsumer
-from .simple_consumer import SimpleConsumer
 
 __all__ = [
-    "SimpleConsumer",
     "PushConsumer",
+    "LitePushConsumer",
     "MessageListener",
     "ConsumeResult",
 ]
diff --git a/python/rocketmq/v5/consumer/consumption.py 
b/python/rocketmq/v5/consumer/push/consumption.py
similarity index 97%
rename from python/rocketmq/v5/consumer/consumption.py
rename to python/rocketmq/v5/consumer/push/consumption.py
index b4170241..2437a768 100644
--- a/python/rocketmq/v5/consumer/consumption.py
+++ b/python/rocketmq/v5/consumer/push/consumption.py
@@ -16,10 +16,9 @@
 import functools
 from concurrent.futures import ThreadPoolExecutor
 
+from rocketmq.v5.consumer.push.message_listener import ConsumeResult
 from rocketmq.v5.log import logger
 
-from .message_listener import ConsumeResult
-
 
 class Consumption:
 
diff --git a/python/rocketmq/v5/consumer/fifo_consumption.py 
b/python/rocketmq/v5/consumer/push/fifo_consumption.py
similarity index 92%
rename from python/rocketmq/v5/consumer/fifo_consumption.py
rename to python/rocketmq/v5/consumer/push/fifo_consumption.py
index 5d4f0658..035d8947 100644
--- a/python/rocketmq/v5/consumer/fifo_consumption.py
+++ b/python/rocketmq/v5/consumer/push/fifo_consumption.py
@@ -18,9 +18,8 @@ import time
 from collections import defaultdict
 from concurrent.futures import ThreadPoolExecutor
 
-from rocketmq.v5.consumer.message_listener import ConsumeResult
+from rocketmq.v5.consumer.push.message_listener import ConsumeResult
 from rocketmq.v5.log import logger
-from rocketmq.v5.model.retry_policy import RetryPolicy
 
 
 class FifoConsumption:
@@ -90,12 +89,8 @@ class FifoConsumption:
             return consume_result
 
         # reconsume
-        if self.__backoff_policy:
-            max_attempts = self.__backoff_policy.max_attempts
-            attempt_delay = 
self.__backoff_policy.get_next_attempt_delay(attempt)
-        else:
-            max_attempts = RetryPolicy.DEFAULT_MAX_ATTEMPTS
-            attempt_delay = RetryPolicy.DEFAULT_RECONSUME_DELAY
+        max_attempts = self.__backoff_policy.max_attempts
+        attempt_delay = self.__backoff_policy.get_next_attempt_delay(attempt)
         if attempt >= max_attempts:
             return consume_result
         time.sleep(attempt_delay)
diff --git a/python/rocketmq/v5/consumer/push/lite_push_consumer.py 
b/python/rocketmq/v5/consumer/push/lite_push_consumer.py
new file mode 100644
index 00000000..56d0609a
--- /dev/null
+++ b/python/rocketmq/v5/consumer/push/lite_push_consumer.py
@@ -0,0 +1,160 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from rocketmq.grpc_protocol import (ClientType, Code, LiteSubscriptionAction,
+                                    SyncLiteSubscriptionRequest)
+from rocketmq.v5.client import ClientConfiguration, ClientScheduler
+from rocketmq.v5.exception import (IllegalStateException,
+                                   LiteSubscriptionQuotaExceededException)
+from rocketmq.v5.log import logger
+from rocketmq.v5.model.filter_expression import FilterExpression
+from rocketmq.v5.util import AtomicInteger, MessagingResultChecker
+
+from .message_listener import MessageListener
+from .push_consumer import PushConsumer
+
+
+class LitePushConsumer(PushConsumer):
+
+    def __init__(
+        self,
+        client_configuration: ClientConfiguration,
+        consumer_group,
+        bind_topic,
+        message_listener: MessageListener = None,
+        max_cache_message_count=1024,
+        max_cache_message_size=64 * 1024 * 1024,  # in bytes, 64MB default
+        consumption_thread_count=20,
+        tls_enable=False,
+    ):
+        if not bind_topic:
+            raise Exception("bindTopic has not been set yet.")
+
+        super().__init__(
+            client_configuration,
+            consumer_group,
+            message_listener,
+            {bind_topic: FilterExpression()},
+            max_cache_message_count,
+            max_cache_message_size,
+            consumption_thread_count,
+            tls_enable,
+            ClientType.LITE_PUSH_CONSUMER
+        )
+        self.__bind_topic = bind_topic
+        self.__lite_topics = set()
+        self.__lite_subscription_quota = AtomicInteger(0)
+        self.__max_lite_topic_size = 64
+
+    """ override """
+
+    def _on_start(self):
+        super()._on_start()
+        try:
+            self._sync_all_lite_subscription_scheduler = 
ClientScheduler(f"{self.client_id}_sync_all_lite_subscription_scheduler_thread",
 self.__sync_all_lite_subscription, 30, 30, self._rpc_channel_io_loop())
+            self._sync_all_lite_subscription_scheduler.start_scheduler()
+            logger.info("start sync all lite subscription scheduler success.")
+        except Exception as e:
+            logger.error(f"{self} on start error, {e}")
+            raise e
+
+    def reset_setting(self, settings):
+        if not settings or not settings.subscription:
+            return
+        if not settings.subscription.lite_subscription_quota or not 
settings.subscription.max_lite_topic_size:
+            return
+        self.__lite_subscription_quota = 
AtomicInteger(settings.subscription.lite_subscription_quota)
+        self.__max_lite_topic_size = settings.subscription.max_lite_topic_size
+        super().reset_setting(settings)
+
+    """ public """
+
+    def subscribe_lite(self, lite_topic):
+        if not self.is_running:
+            raise IllegalStateException("unable to add lite subscription 
because consumer is not running")
+        if lite_topic in self.__lite_topics:
+            return
+        if not lite_topic:
+            raise IllegalStateException("liteTopic is blank.")
+        if len(lite_topic) > self.__max_lite_topic_size:
+            raise IllegalStateException(f"lite_topic: {lite_topic} length 
exceeded max length {self.__max_lite_topic_size}.")
+        if len(self.__lite_topics) + 1 > self.__lite_subscription_quota.get():
+            raise LiteSubscriptionQuotaExceededException(f"Lite subscription 
exceed quota: {self.__lite_subscription_quota.get()} ", 
Code.LITE_SUBSCRIPTION_QUOTA_EXCEEDED)
+        try:
+            res = self.rpc_client.sync_lite_subscription_async(
+                self.client_configuration.rpc_endpoints,
+                self.__sync_lite_subscription_req({lite_topic}, 
LiteSubscriptionAction.PARTIAL_ADD),
+                metadata=self._sign(),
+                timeout=self.client_configuration.request_timeout,
+            ).result()
+            MessagingResultChecker.check(res.status)
+            if res.status.code == Code.OK:
+                self.__lite_topics.add(lite_topic)
+                logger.info(f"[{self}] subscribe lite_topic:{lite_topic} 
success.")
+        except Exception as e:
+            logger.info(f"[{self}] subscribe lite_topic:{lite_topic} raise 
exception, {e}.")
+
+    def unsubscribe_lite(self, lite_topic):
+        if not self.is_running:
+            raise IllegalStateException("unable to remove lite subscription 
because consumer is not running")
+        if lite_topic not in self.__lite_topics:
+            return
+        try:
+            res = self.rpc_client.sync_lite_subscription_async(
+                self.client_configuration.rpc_endpoints,
+                self.__sync_lite_subscription_req({lite_topic}, 
LiteSubscriptionAction.PARTIAL_REMOVE),
+                metadata=self._sign(),
+                timeout=self.client_configuration.request_timeout,
+            ).result()
+            MessagingResultChecker.check(res.status)
+            if res.status.code == Code.OK:
+                self.__lite_topics.remove(lite_topic)
+                logger.info(f"[{self}] unsubscribe lite_topic:{lite_topic} 
success.")
+        except Exception as e:
+            logger.info(f"[{self}] unsubscribe lite_topic:{lite_topic} raise 
exception, {e}.")
+
+    def subscribe(self, topic, filter_expression: FilterExpression = None):
+        raise NotImplementedError("LitePushConsumer does not support topic 
subscription.")
+
+    def unsubscribe(self, topic):
+        raise NotImplementedError("LitePushConsumer does not support topic 
unsubscription.")
+
+    """ private """
+
+    def __sync_all_lite_subscription(self):
+        if not self.__lite_topics:
+            return
+        try:
+            res = self.rpc_client.sync_lite_subscription_async(
+                self.client_configuration.rpc_endpoints,
+                self.__sync_lite_subscription_req(self.__lite_topics.copy(), 
LiteSubscriptionAction.COMPLETE_ADD),
+                metadata=self._sign(),
+                timeout=self.client_configuration.request_timeout,
+            ).result()
+            MessagingResultChecker.check(res.status)
+            if res.status.code == Code.OK:
+                logger.info(f"{self} sync all lite subscription to 
{self.client_configuration.rpc_endpoints} success.")
+        except Exception as e:
+            logger.info(f"[{self}] sync all lite subscription to 
{self.client_configuration.rpc_endpoints} raise exception, {e}")
+
+    def __sync_lite_subscription_req(self, lite_topics, action: 
LiteSubscriptionAction):
+        req = SyncLiteSubscriptionRequest()
+        req.action = action
+        req.topic.name = self.__bind_topic
+        req.topic.resource_namespace = self.client_configuration.namespace
+        req.group.name = self._consumer_group
+        req.group.resource_namespace = self.client_configuration.namespace
+        req.lite_topic_set.extend(lite_topics)
+        return req
diff --git a/python/rocketmq/v5/consumer/message_listener.py 
b/python/rocketmq/v5/consumer/push/message_listener.py
similarity index 100%
rename from python/rocketmq/v5/consumer/message_listener.py
rename to python/rocketmq/v5/consumer/push/message_listener.py
diff --git a/python/rocketmq/v5/consumer/push_consumer.py 
b/python/rocketmq/v5/consumer/push/push_consumer.py
similarity index 85%
rename from python/rocketmq/v5/consumer/push_consumer.py
rename to python/rocketmq/v5/consumer/push/push_consumer.py
index 03c62736..e0170988 100644
--- a/python/rocketmq/v5/consumer/push_consumer.py
+++ b/python/rocketmq/v5/consumer/push/push_consumer.py
@@ -24,11 +24,11 @@ from rocketmq.grpc_protocol import (ClientType, Code,
                                     QueryAssignmentRequest)
 from rocketmq.v5.client import ClientConfiguration, ClientScheduler
 from rocketmq.v5.consumer.consumer import Consumer
-from rocketmq.v5.consumer.consumption import Consumption
-from rocketmq.v5.consumer.fifo_consumption import FifoConsumption
-from rocketmq.v5.consumer.message_listener import (ConsumeResult,
-                                                   MessageListener)
-from rocketmq.v5.exception import BadRequestException, IllegalStateException
+from rocketmq.v5.consumer.push.consumption import Consumption
+from rocketmq.v5.consumer.push.fifo_consumption import FifoConsumption
+from rocketmq.v5.consumer.push.message_listener import (ConsumeResult,
+                                                        MessageListener)
+from rocketmq.v5.exception import BadRequestException
 from rocketmq.v5.log import logger
 from rocketmq.v5.model.assignment import Assignments
 from rocketmq.v5.model.process_queue import ProcessQueue
@@ -39,6 +39,7 @@ from rocketmq.v5.util import ConcurrentMap, 
MessagingResultChecker
 class PushConsumer(Consumer):
 
     ACK_MESSAGE_FAILURE_BACKOFF_DELAY = 1
+    DEFAULT_CONSUME_ATTEMPTS = 17
     RECEIVE_RETRY_DELAY = 1
 
     def __init__(
@@ -50,12 +51,13 @@ class PushConsumer(Consumer):
         max_cache_message_count=1024,
         max_cache_message_size=64 * 1024 * 1024,  # in bytes, 64MB default
         consumption_thread_count=20,
-        tls_enable=False
+        tls_enable=False,
+        client_type=ClientType.PUSH_CONSUMER
     ):
         super().__init__(
             client_configuration,
             consumer_group,
-            ClientType.PUSH_CONSUMER,
+            client_type,
             subscription,
             tls_enable
         )
@@ -73,11 +75,15 @@ class PushConsumer(Consumer):
         self.__receive_batch_size = 32
         self.__long_polling_timeout = 30  # seconds
 
-    def set_listener(self, message_listener: MessageListener):
-        self.__message_listener = message_listener
-
     """ override """
 
+    def unsubscribe(self, topic):
+        super().unsubscribe(topic)
+        for message_queue, process_queue in self.__process_queues.items():
+            if message_queue.topic == topic:
+                self.__drop_message_queue(message_queue)
+        self.__assignments.remove(topic)
+
     def shutdown(self):
         logger.info(f"begin to to shutdown {self}.")
         super().shutdown()
@@ -85,28 +91,34 @@ class PushConsumer(Consumer):
         logger.info(f"shutdown {self} success.")
 
     def reset_setting(self, settings):
-        self.__long_polling_timeout = 
settings.subscription.long_polling_timeout.seconds
-        self.__configure_consumer_consumption(settings)
+        if settings:
+            self.__long_polling_timeout = 
settings.subscription.long_polling_timeout.seconds
+            self.__configure_consumer_consumption(settings)
+            if not self._init_settings_event.is_set():
+                self._init_settings_event.set()
 
     def reset_metric(self, metric):
         super().reset_metric(metric)
         self.__register_process_queues_gauges()
 
-    def _sync_setting_req(self, endpoints):
-        req = super()._sync_setting_req(endpoints)
-        req.settings.subscription.long_polling_timeout.seconds = 
self.__long_polling_timeout
-        return req
+    # def _sync_setting_req(self, endpoints):
+    #     req = super()._sync_setting_req(endpoints)
+    #     req.settings.subscription.long_polling_timeout.seconds = 
self.__long_polling_timeout
+    #     return req
 
     def _pre_start(self):
         if not self.__message_listener:
-            logger.error("message listener has not been set yet")
-            raise IllegalStateException(f"{self}'s message listener has not 
been set yet.")
+            raise Exception("messageListener has not been set yet.")
+
+        if not self._subscriptions.keys():
+            raise Exception("subscription expressions have not been set yet.")
 
     def _on_start(self):
         try:
             self.__start_async_executor()
             self.__scan_assignment_scheduler = 
ClientScheduler(f"{self.client_id}_scan_assignment_schedule_thread", 
self.__scan_assignment, 1, 5, self._rpc_channel_io_loop())
             self.__scan_assignment_scheduler.start_scheduler()
+            logger.info("start scan assignment scheduler success.")
             logger.info(f"{self} start success.")
         except Exception as e:
             logger.error(f"{self} on start error, {e}")
@@ -128,10 +140,11 @@ class PushConsumer(Consumer):
             raise e
 
     def __configure_consumer_consumption(self, settings):
-        if settings.backoff_policy.WhichOneof("strategy") == 
"customized_backoff":
-            backoff_policy = 
CustomizedBackoffRetryPolicy(settings.backoff_policy)
+        use_customized_backoff = 
settings.backoff_policy.WhichOneof("strategy") == "customized_backoff"
+        if use_customized_backoff:
+            backoff_policy = 
CustomizedBackoffRetryPolicy(settings.backoff_policy, 
PushConsumer.DEFAULT_CONSUME_ATTEMPTS)
         else:
-            backoff_policy = CustomizedBackoffRetryPolicy(None)
+            backoff_policy = CustomizedBackoffRetryPolicy(None, 
PushConsumer.DEFAULT_CONSUME_ATTEMPTS)
 
         if not self.__consumption:
             self.__fifo = settings.subscription.fifo
@@ -143,8 +156,10 @@ class PushConsumer(Consumer):
                 self.__consumption = FifoConsumption(self.__message_listener, 
self.__consumption_thread_count,
                                                      
self.__on_fifo_consumption_result,
                                                      self.client_id, 
backoff_policy)
+            logger.info(f"configure {self} consumption success, fifo: 
{self.__fifo}")
         else:
-            self.__consumption.backoff_policy = backoff_policy
+            if use_customized_backoff and self.__consumption.backoff_policy != 
backoff_policy:
+                self.__consumption.backoff_policy = backoff_policy
 
     # assignment #
 
@@ -183,12 +198,14 @@ class PushConsumer(Consumer):
             else:
                 existed = self.__assignments.get(topic)
                 if existed == new_assignments:
+                    self.__drop_expired_message_queue()
                     return
                 else:
                     # topic route changed
                     removed_message_queues = Assignments.diff_queues(existed, 
new_assignments)
                     self.__drop_message_queues(removed_message_queues)
                     new_message_queues = 
Assignments.diff_queues(new_assignments, existed)
+                    self.__assignments.put(topic, new_assignments)
             self.__drop_expired_message_queue()
             self.__process_message_queues(new_message_queues)
         except Exception as e:
@@ -210,7 +227,7 @@ class PushConsumer(Consumer):
             logger.error(f"queue: {message_queue} end receive, because 
consumer is not running.")
             return
         if process_queue.dropped:
-            logger.error(f"queue: {message_queue} end receive, because queue 
is dropped. ")
+            logger.info(f"queue: {message_queue} end receive, because queue is 
dropped. ")
             return
         if not attempt_id:
             attempt_id = self.__generate_attempt_id()
@@ -276,14 +293,14 @@ class PushConsumer(Consumer):
         # in consume thread
         try:
             if consume_result == ConsumeResult.SUCCESS:
-                self.ack(message)
+                self._ack(message)
             else:
                 delivery_attempt = message.delivery_attempt
                 if self.__consumption.backoff_policy:
                     invisible_duration = 
self.__consumption.backoff_policy.get_next_attempt_delay(delivery_attempt)
                 else:
                     invisible_duration = 30
-                self.change_invisible_duration(message, invisible_duration)
+                self._change_invisible_duration(message, invisible_duration)
             self.__evict_message(message, message_queue)
         except Exception as e:
             logger.error(f"ack or nack raise exception, {e}")
@@ -322,6 +339,8 @@ class PushConsumer(Consumer):
         req.message_id = message.message_id
         req.delivery_attempt = message.delivery_attempt
         req.max_delivery_attempts = 
self.__consumption.backoff_policy.max_attempts
+        if self.client_type == ClientType.LITE_PUSH_CONSUMER:
+            req.lite_topic = message.lite_topic
         return req
 
     def __forward_message_to_dead_letter_queue(self, message):
@@ -349,11 +368,14 @@ class PushConsumer(Consumer):
         for message_queue, process_queue in self.__process_queues.items():
             self.__execute_receive(message_queue, process_queue)
 
+    def __drop_message_queue(self, dropped_message_queue):
+        dropped_process_queue = 
self.__process_queues.remove(dropped_message_queue)
+        if dropped_process_queue:
+            dropped_process_queue.drop()
+
     def __drop_message_queues(self, dropped_message_queues):
         for dropped_message_queue in dropped_message_queues:
-            dropped_process_queue = 
self.__process_queues.remove(dropped_message_queue)
-            if dropped_process_queue:
-                dropped_process_queue.drop()
+            self.__drop_message_queue(dropped_message_queue)
 
     def __drop_expired_message_queue(self):
         expired_message_queue = [
@@ -378,7 +400,7 @@ class PushConsumer(Consumer):
         if process_queue:
             process_queue.evict_message(message)
 
-    def __aggregate_process_queues_by_topic(self, attr_name: str):
+    def __aggregate_process_queues_by_attr_name(self, attr_name: str):
         topic_values = {}
         for message_queue, process_queue in self.__process_queues.items():
             topic = message_queue.topic
@@ -397,10 +419,10 @@ class PushConsumer(Consumer):
         ]
 
     def __process_queues_cached_count(self):
-        return 
self.__aggregate_process_queues_by_topic("cached_messages_count")
+        return 
self.__aggregate_process_queues_by_attr_name("cached_messages_count")
 
     def __process_queues_cached_bytes(self):
-        return 
self.__aggregate_process_queues_by_topic("cached_messages_bytes")
+        return 
self.__aggregate_process_queues_by_attr_name("cached_messages_bytes")
 
     def __register_process_queues_gauges(self):
         
self.client_metrics.create_push_consumer_process_queue_observable_gauge(
diff --git a/python/rocketmq/v5/consumer/__init__.py 
b/python/rocketmq/v5/consumer/simple/__init__.py
similarity index 83%
copy from python/rocketmq/v5/consumer/__init__.py
copy to python/rocketmq/v5/consumer/simple/__init__.py
index b6a38b3b..bd12e42f 100644
--- a/python/rocketmq/v5/consumer/__init__.py
+++ b/python/rocketmq/v5/consumer/simple/__init__.py
@@ -13,13 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from .message_listener import ConsumeResult, MessageListener
-from .push_consumer import PushConsumer
 from .simple_consumer import SimpleConsumer
 
 __all__ = [
     "SimpleConsumer",
-    "PushConsumer",
-    "MessageListener",
-    "ConsumeResult",
 ]
diff --git a/python/rocketmq/v5/consumer/simple_consumer.py 
b/python/rocketmq/v5/consumer/simple/simple_consumer.py
similarity index 90%
rename from python/rocketmq/v5/consumer/simple_consumer.py
rename to python/rocketmq/v5/consumer/simple/simple_consumer.py
index 332ae9d3..a29f5532 100644
--- a/python/rocketmq/v5/consumer/simple_consumer.py
+++ b/python/rocketmq/v5/consumer/simple/simple_consumer.py
@@ -72,6 +72,10 @@ class SimpleConsumer(Consumer):
         super().shutdown()
         logger.info(f"shutdown {self} success.")
 
+    def reset_setting(self, settings):
+        if not self._init_settings_event.is_set():
+            self._init_settings_event.set()
+
     def _on_start(self):
         logger.info(f"{self} start success.")
 
@@ -89,6 +93,20 @@ class SimpleConsumer(Consumer):
             return
         queue_selector.update(topic_route)
 
+    """ public """
+
+    def ack(self, message):
+        self._ack(message)
+
+    def ack_async(self, message):
+        self._ack_async(message)
+
+    def change_invisible_duration(self, message, invisible_duration):
+        self._change_invisible_duration(message, invisible_duration)
+
+    def change_invisible_duration_async(self, message, invisible_duration):
+        self._change_invisible_duration_async(message, invisible_duration)
+
     """ private """
 
     def __select_topic_for_receive(self):
diff --git a/python/rocketmq/v5/exception/__init__.py 
b/python/rocketmq/v5/exception/__init__.py
index 2f58a9fd..71a8ef8b 100644
--- a/python/rocketmq/v5/exception/__init__.py
+++ b/python/rocketmq/v5/exception/__init__.py
@@ -16,7 +16,10 @@
 from .client_exception import (BadRequestException, ClientException,
                                ForbiddenException, IllegalArgumentException,
                                IllegalStateException, InternalErrorException,
-                               NotFoundException, PayloadTooLargeException,
+                               LiteSubscriptionQuotaExceededException,
+                               LiteTopicQuotaExceededException,
+                               NotFoundException, PayloadEmptyException,
+                               PayloadTooLargeException,
                                PaymentRequiredException, ProxyTimeoutException,
                                RequestHeaderFieldsTooLargeException,
                                TooManyRequestsException, UnauthorizedException,
@@ -30,7 +33,10 @@ __all__ = [
     "ForbiddenException",
     "NotFoundException",
     "PayloadTooLargeException",
+    "PayloadEmptyException",
     "TooManyRequestsException",
+    "LiteSubscriptionQuotaExceededException",
+    "LiteTopicQuotaExceededException",
     "RequestHeaderFieldsTooLargeException",
     "InternalErrorException",
     "ProxyTimeoutException",
diff --git a/python/rocketmq/v5/exception/client_exception.py 
b/python/rocketmq/v5/exception/client_exception.py
index cd74134a..7c9e2bf0 100644
--- a/python/rocketmq/v5/exception/client_exception.py
+++ b/python/rocketmq/v5/exception/client_exception.py
@@ -21,7 +21,7 @@ class ClientException(Exception):
         self.__code = code
 
     def __str__(self):
-        if self.__code is not None:
+        if self.__code:
             return f"{self.__code}, {super().__str__()}"
         else:
             return f"{super().__str__()}"
@@ -32,69 +32,59 @@ class ClientException(Exception):
 
 
 class BadRequestException(ClientException):
-
-    def __init__(self, message, code):
-        super().__init__(message, code)
+    pass
 
 
 class UnauthorizedException(ClientException):
-
-    def __init__(self, message, code):
-        super().__init__(message, code)
+    pass
 
 
 class PaymentRequiredException(ClientException):
-
-    def __init__(self, message, code):
-        super().__init__(message, code)
+    pass
 
 
 class ForbiddenException(ClientException):
-
-    def __init__(self, message, code):
-        super().__init__(message, code)
+    pass
 
 
 class NotFoundException(ClientException):
-
-    def __init__(self, message, code):
-        super().__init__(message, code)
+    pass
 
 
 class PayloadTooLargeException(ClientException):
+    pass
+
 
-    def __init__(self, message, code):
-        super().__init__(message, code)
+class PayloadEmptyException(ClientException):
+    pass
 
 
 class TooManyRequestsException(ClientException):
+    pass
 
-    def __init__(self, message, code):
-        super().__init__(message, code)
 
+class LiteTopicQuotaExceededException(ClientException):
+    pass
 
-class RequestHeaderFieldsTooLargeException(ClientException):
 
-    def __init__(self, message, code):
-        super().__init__(message, code)
+class LiteSubscriptionQuotaExceededException(ClientException):
+    pass
 
 
-class InternalErrorException(ClientException):
+class RequestHeaderFieldsTooLargeException(ClientException):
+    pass
 
-    def __init__(self, message, code):
-        super().__init__(message, code)
 
+class InternalErrorException(ClientException):
+    pass
 
-class ProxyTimeoutException(ClientException):
 
-    def __init__(self, message, code):
-        super().__init__(message, code)
+class ProxyTimeoutException(ClientException):
+    pass
 
 
 class UnsupportedException(ClientException):
-
-    def __init__(self, message, code=None):
-        super().__init__(message, code)
+    pass
 
 
 class IllegalArgumentException(ClientException):
diff --git a/python/rocketmq/v5/model/message.py 
b/python/rocketmq/v5/model/message.py
index a94925f7..ce63334d 100644
--- a/python/rocketmq/v5/model/message.py
+++ b/python/rocketmq/v5/model/message.py
@@ -24,6 +24,7 @@ class Message:
     def __init__(self):
         self.__body = None
         self.__topic = None
+        self.__lite_topic = None
         self.__namespace = None
         self.__message_id = None
         self.__tag = None
@@ -51,6 +52,8 @@ class Message:
         try:
             self.__message_body_check_sum(message)
             self.__topic = message.topic.name
+            if message.system_properties.lite_topic:
+                self.__lite_topic = message.system_properties.lite_topic
             self.__namespace = message.topic.resource_namespace
             self.__message_id = message.system_properties.message_id
             self.__body = self.__uncompress_body(message)
@@ -89,14 +92,14 @@ class Message:
                 if message.system_properties.body_digest.checksum != md5_sum:
                     self.__corrupted = True
                     logger.error(
-                        f"(body_check_sum exception, topic: 
{message.topic.name}, message_id: {message.system_properties.message_id} 
{message.digest.checksum} != crc32_sum {md5_sum}"
+                        f"(body_check_sum exception, topic: 
{message.topic.name}, message_id: {message.system_properties.message_id} 
{message.digest.checksum} != md5_checksum {md5_sum}"
                     )
             elif message.system_properties.body_digest.type == DigestType.SHA1:
                 sha1_sum = Misc.sha1_checksum(message.body)
                 if message.system_properties.body_digest.checksum != sha1_sum:
                     self.__corrupted = True
                     logger.error(
-                        f"(body_check_sum exception, topic: 
{message.topic.name}, message_id: {message.system_properties.message_id} 
{message.digest.checksum} != crc32_sum {sha1_sum}"
+                        f"(body_check_sum exception, topic: 
{message.topic.name}, message_id: {message.system_properties.message_id} 
{message.digest.checksum} != sha1_checksum {sha1_sum}"
                     )
             else:
                 logger.error(
@@ -130,6 +133,10 @@ class Message:
     def topic(self):
         return self.__topic
 
+    @property
+    def lite_topic(self):
+        return self.__lite_topic
+
     @property
     def namespace(self):
         return self.__namespace
@@ -195,50 +202,58 @@ class Message:
         return self.__corrupted
 
     @body.setter
-    def body(self, body):
-        if body is None or body.strip() == "":
-            raise IllegalArgumentException("body should not be blank")
+    def body(self, body: bytes):
+        if not body:
+            raise IllegalArgumentException("body should not be blank.")
         self.__body = body
 
     @topic.setter
-    def topic(self, topic):
-        if topic is None or topic.strip() == "":
-            raise IllegalArgumentException("topic has not been set yet")
+    def topic(self, topic: str):
+        if not topic or not topic.strip():
+            raise IllegalArgumentException("topic has not been set yet.")
         if Misc.is_valid_topic(topic):
             self.__topic = topic
         else:
-            raise IllegalArgumentException(
-                f"topic does not match the regex [regex={Misc.TOPIC_PATTERN}]"
-            )
+            raise IllegalArgumentException(f"topic does not match the regex 
[regex={Misc.TOPIC_PATTERN}].")
+
+    @lite_topic.setter
+    def lite_topic(self, lite_topic: str):
+        if not lite_topic or not lite_topic.strip():
+            raise IllegalArgumentException("lite_topic has not been set yet.")
+        if self.__message_group:
+            raise IllegalArgumentException("lite_topic and message_group 
should not be set at same time.")
+        if self.__delivery_timestamp:
+            raise IllegalArgumentException("lite_topic and delivery_timestamp 
should not be set at same time.")
+        self.__lite_topic = lite_topic
 
     @message_id.setter
     def message_id(self, message_id):
         self.__message_id = message_id
 
     @tag.setter
-    def tag(self, tag):
-        if tag is None or tag.strip() == "":
-            raise IllegalArgumentException("tag should not be blank")
+    def tag(self, tag: str):
+        if not tag or not tag.strip():
+            raise IllegalArgumentException("tag should not be blank.")
         if "|" in tag:
-            raise IllegalArgumentException('tag should not contain "|"')
+            raise IllegalArgumentException('tag should not contain "|".')
         self.__tag = tag
 
     @message_group.setter
-    def message_group(self, message_group):
-        if self.__delivery_timestamp is not None:
-            raise IllegalArgumentException(
-                "deliveryTimestamp and messageGroup should not be set at same 
time"
-            )
-        if message_group is None or len(message_group) == 0:
-            raise IllegalArgumentException("messageGroup should not be blank")
+    def message_group(self, message_group: str):
+        if not message_group or not message_group.strip():
+            raise IllegalArgumentException("message_group should not be 
blank.")
+        if self.__delivery_timestamp:
+            raise IllegalArgumentException("delivery_timestamp and 
message_group should not be set at same time.")
+        if self.__lite_topic:
+            raise IllegalArgumentException("lite_topic and message_group 
should not be set at same time.")
         self.__message_group = message_group
 
     @delivery_timestamp.setter
-    def delivery_timestamp(self, delivery_timestamp):
-        if self.__message_group is not None:
-            raise IllegalArgumentException(
-                "deliveryTimestamp and messageGroup should not be set at same 
time"
-            )
+    def delivery_timestamp(self, delivery_timestamp: int):
+        if self.__message_group:
+            raise IllegalArgumentException("delivery_timestamp and 
message_group should not be set at same time.")
+        if self.__lite_topic:
+            raise IllegalArgumentException("lite_topic and delivery_timestamp 
should not be set at same time.")
         self.__delivery_timestamp = delivery_timestamp
 
     @transport_delivery_timestamp.setter
@@ -246,10 +261,10 @@ class Message:
         self.__transport_delivery_timestamp = transport_delivery_timestamp
 
     @keys.setter
-    def keys(self, *keys):
+    def keys(self, *keys: str):
         for key in keys:
-            if not key or key.strip() == "":
-                raise IllegalArgumentException("key should not be blank")
+            if not key or not key.strip():
+                raise IllegalArgumentException("key should not be blank.")
         self.__keys.update(set(keys))
 
     @receipt_handle.setter
@@ -264,11 +279,11 @@ class Message:
     def endpoints(self, endpoints):
         self.__endpoints = endpoints
 
-    def add_property(self, key, value):
-        if key is None or key.strip() == "":
-            raise IllegalArgumentException("key should not be blank")
-        if value is None or value.strip() == "":
-            raise IllegalArgumentException("value should not be blank")
+    def add_property(self, key: str, value: str):
+        if not key or not key.strip():
+            raise IllegalArgumentException("key should not be blank.")
+        if not value or not value.strip():
+            raise IllegalArgumentException("value should not be blank.")
         self.__properties[key] = value
 
     @staticmethod
@@ -281,5 +296,7 @@ class Message:
             return "DELAY"
         elif message_type == 4:
             return "TRANSACTION"
+        elif message_type == 5:
+            return "LITE"
         else:
             return "MESSAGE_TYPE_UNSPECIFIED"
diff --git a/python/rocketmq/v5/model/process_queue.py 
b/python/rocketmq/v5/model/process_queue.py
index 4d876485..04a1d2d6 100644
--- a/python/rocketmq/v5/model/process_queue.py
+++ b/python/rocketmq/v5/model/process_queue.py
@@ -72,7 +72,7 @@ class ProcessQueue:
     def expired(self, long_polling_timeout, request_timeout):
         max_idle_duration = (long_polling_timeout + request_timeout) * 3
         idle_duration = int(time.time()) - self.__active_time
-        if idle_duration < 0:
+        if idle_duration - max_idle_duration < 0:
             return False
         after_cache_full_duration = int(time.time()) - self.__cache_full_time
         if after_cache_full_duration - max_idle_duration < 0:
diff --git a/python/rocketmq/v5/model/retry_policy.py 
b/python/rocketmq/v5/model/retry_policy.py
index 063d6424..3e6d7092 100644
--- a/python/rocketmq/v5/model/retry_policy.py
+++ b/python/rocketmq/v5/model/retry_policy.py
@@ -18,14 +18,14 @@ from rocketmq.v5.exception import IllegalArgumentException
 
 class RetryPolicy:
 
-    DEFAULT_MAX_ATTEMPTS = 17
-    DEFAULT_RECONSUME_DELAY = 1
+    DEFAULT_RECONSUME_DELAY = 1  # seconds
+    DEFAULT_RESEND_DELAY = 1  # seconds
 
-    def __init__(self, backoff_policy):
+    def __init__(self, backoff_policy, max_attempts):
         if backoff_policy:
             self.__max_attempts = backoff_policy.max_attempts
         else:
-            self.__max_attempts = RetryPolicy.DEFAULT_MAX_ATTEMPTS
+            self.__max_attempts = max_attempts
 
     @property
     def max_attempts(self):
@@ -34,13 +34,16 @@ class RetryPolicy:
 
 class CustomizedBackoffRetryPolicy(RetryPolicy):
 
-    def __init__(self, backoff_policy):
-        super().__init__(backoff_policy)
+    def __init__(self, backoff_policy, default_max_attempts):
+        super().__init__(backoff_policy, default_max_attempts)
         if backoff_policy:
             self.__durations = list(map(lambda item: item.seconds, 
backoff_policy.customized_backoff.next))
         else:
             self.__durations = list()
 
+    def __eq__(self, other):
+        return self.max_attempts == other.max_attempts and self.__durations == 
other.__durations
+
     def get_next_attempt_delay(self, attempt):
         if attempt < 0:
             raise IllegalArgumentException("attempt must be positive")
@@ -49,3 +52,48 @@ class CustomizedBackoffRetryPolicy(RetryPolicy):
             return self.__durations[size - 1] if attempt > size else 
self.__durations[attempt - 1]
         else:
             return RetryPolicy.DEFAULT_RECONSUME_DELAY
+
+    @property
+    def durations(self):
+        return self.__durations
+
+
+class ExponentialBackoffRetryPolicy(RetryPolicy):
+
+    def __init__(self, backoff_policy, default_max_attempts):
+        super().__init__(backoff_policy, default_max_attempts)
+        if backoff_policy:
+            self.__initial_backoff = 
backoff_policy.exponential_backoff.initial.seconds * 1_000_000_000 + 
backoff_policy.exponential_backoff.initial.nanos  # nanos
+            self.__max_backoff = 
backoff_policy.exponential_backoff.max.seconds * 1_000_000_000 + 
backoff_policy.exponential_backoff.max.nanos  # nanos
+            self.__multiplier = backoff_policy.exponential_backoff.multiplier
+        else:
+            self.__initial_backoff = None
+            self.__max_backoff = None
+            self.__multiplier = None
+
+    def __eq__(self, other):
+        return self.max_attempts == other.max_attempts and 
self.initial_backoff == other.initial_backoff and self.max_backoff == 
other.max_backoff and self.multiplier == other.multiplier
+
+    def get_next_attempt_delay(self, attempt):
+        if attempt < 0:
+            raise IllegalArgumentException("attempt must be positive")
+        if self.__initial_backoff and self.__max_backoff and self.__multiplier:
+            exp_backoff_nanos = self.__initial_backoff * (self.__multiplier ** 
(attempt - 1))
+            delay_nanos = int(min(exp_backoff_nanos, self.__max_backoff))
+            if delay_nanos <= 0:
+                return 0
+            return delay_nanos / 1_000_000_000
+        else:
+            return RetryPolicy.DEFAULT_RESEND_DELAY
+
+    @property
+    def initial_backoff(self):
+        return self.__initial_backoff
+
+    @property
+    def max_backoff(self):
+        return self.__max_backoff
+
+    @property
+    def multiplier(self):
+        return self.__multiplier
diff --git a/python/rocketmq/v5/producer/producer.py 
b/python/rocketmq/v5/producer/producer.py
index 0f16cf3e..ec325005 100644
--- a/python/rocketmq/v5/producer/producer.py
+++ b/python/rocketmq/v5/producer/producer.py
@@ -33,6 +33,7 @@ from rocketmq.v5.exception import (ClientException, 
IllegalArgumentException,
                                    TooManyRequestsException)
 from rocketmq.v5.log import logger
 from rocketmq.v5.model import CallbackResult, Message, SendReceipt
+from rocketmq.v5.model.retry_policy import ExponentialBackoffRetryPolicy
 from rocketmq.v5.util import (ConcurrentMap, MessageIdCodec,
                               MessagingResultChecker, Misc)
 
@@ -125,7 +126,9 @@ class TransactionChecker(metaclass=abc.ABCMeta):
 
 
 class Producer(Client):
-    MAX_SEND_ATTEMPTS = 3  # max retry times when send failed
+
+    DEFAULT_MAX_SEND_ATTEMPTS = 3  # default max retry times when send failed
+    DEFAULT_MAX_BODY_SIZE = 4 * 1024 * 1024  # default max message body size
 
     def __init__(
         self, client_configuration, topics=None, checker=None, tls_enable=False
@@ -137,6 +140,12 @@ class Producer(Client):
             checker  # checker for transaction message, handle checking from 
server
         )
         self.__transaction_check_executor = ThreadPoolExecutor(max_workers=1, 
thread_name_prefix="transaction_check_thread")
+        self.__max_body_size = Producer.DEFAULT_MAX_BODY_SIZE
+        self.__validate_message_type = True
+        self.__backoff_policy = ExponentialBackoffRetryPolicy(
+            None,
+            Producer.DEFAULT_MAX_SEND_ATTEMPTS
+        )
 
     def __str__(self):
         return f"{ClientType.Name(self.client_type)} 
client_id:{self.client_id}"
@@ -149,7 +158,7 @@ class Producer(Client):
 
         self.__wrap_sending_message(message, False if transaction is None else 
True)
         topic_queue = self.__select_send_queue(message)
-        if message.message_type not in topic_queue.accept_message_types:
+        if self.__validate_message_type and message.message_type not in 
topic_queue.accept_message_types:
             raise IllegalArgumentException(
                 f"current message type not match with queue accept message 
types, topic:{message.topic}, 
message_type:{Message.message_type_desc(message.message_type)}, queue access 
type:{topic_queue.accept_message_types_desc()}"
             )
@@ -264,6 +273,19 @@ class Producer(Client):
 
     """ override """
 
+    def reset_setting(self, settings):
+        self.__max_body_size = settings.publishing.max_body_size
+        self.__validate_message_type = 
settings.publishing.validate_message_type
+        use_exponential = settings and settings.backoff_policy and 
settings.backoff_policy.WhichOneof("strategy") == "exponential_backoff"
+        new_policy = ExponentialBackoffRetryPolicy(
+            settings.backoff_policy if use_exponential else None,
+            Producer.DEFAULT_MAX_SEND_ATTEMPTS
+        )
+        if use_exponential and self.__backoff_policy != new_policy:
+            self.__backoff_policy = new_policy
+        if not self._init_settings_event.is_set():
+            self._init_settings_event.set()
+
     def _on_start(self):
         logger.info(f"{self} start success.")
 
@@ -355,7 +377,7 @@ class Producer(Client):
             retry_exception_future = self.__check_send_retry_condition(
                 message, topic_queue, attempt, e
             )
-            if retry_exception_future is not None:
+            if retry_exception_future:
                 # end retry with exception
                 self.client_metrics.send_after(send_metric_context, False)
                 raise retry_exception_future.exception()
@@ -435,7 +457,7 @@ class Producer(Client):
 
     def __check_send_retry_condition(self, message, topic_queue, attempt, e):
         end_retry = False
-        if attempt > Producer.MAX_SEND_ATTEMPTS:
+        if attempt > self.__backoff_policy.max_attempts:
             logger.error(
                 f"{self} failed to send message to {topic_queue.endpoints}, 
because of run out of attempt times, topic:{message.topic}, 
message_id:{message.message_id},  message_type:{message.message_type}, 
attempt:{attempt}"
             )
@@ -465,7 +487,7 @@ class Producer(Client):
             msg = req.messages.add()
             msg.topic.name = message.topic
             msg.topic.resource_namespace = self.client_configuration.namespace
-            if message.body is None or len(message.body) == 0:
+            if not message.body or len(message.body) == 0:
                 raise IllegalArgumentException("message body is none.")
             max_body_size = 4 * 1024 * 1024  # max body size is 4m
             if len(message.body) > max_body_size:
@@ -474,20 +496,23 @@ class Producer(Client):
                 )
 
             msg.body = message.body
-            if message.tag is not None:
+            if message.lite_topic:
+                msg.system_properties.lite_topic = message.lite_topic
+            if message.tag:
                 msg.system_properties.tag = message.tag
-            if message.keys is not None:
+            if message.keys:
                 msg.system_properties.keys.extend(message.keys)
-            if message.properties is not None:
+            if message.properties:
                 msg.user_properties.update(message.properties)
             msg.system_properties.message_id = message.message_id
             msg.system_properties.message_type = message.message_type
             msg.system_properties.born_timestamp.seconds = int(time.time())
             msg.system_properties.born_host = Misc.get_local_ip()
             msg.system_properties.body_encoding = Encoding.IDENTITY
-            if message.message_group is not None:
+
+            if message.message_group:
                 msg.system_properties.message_group = message.message_group
-            if message.delivery_timestamp is not None:
+            if message.delivery_timestamp:
                 msg.system_properties.delivery_timestamp.seconds = (
                     message.delivery_timestamp
                 )
@@ -497,22 +522,26 @@ class Producer(Client):
 
     def __send_message_type(self, message: Message, is_transaction=False):
         if (
-            message.message_group is None
-            and message.delivery_timestamp is None
-            and is_transaction is False
+            not message.message_group
+            and not message.lite_topic
+            and not message.delivery_timestamp
+            and not is_transaction
         ):
             return MessageType.NORMAL
 
-        if message.message_group is not None and is_transaction is False:
+        if message.message_group and not is_transaction:
             return MessageType.FIFO
 
-        if message.delivery_timestamp is not None and is_transaction is False:
+        if message.lite_topic and not is_transaction:
+            return MessageType.LITE
+
+        if message.delivery_timestamp and not is_transaction:
             return MessageType.DELAY
 
         if (
-            message.message_group is None
-            and message.delivery_timestamp is None
-            and is_transaction is True
+            not message.message_group
+            and not message.delivery_timestamp
+            and is_transaction
         ):
             return MessageType.TRANSACTION
 
@@ -595,7 +624,7 @@ class Producer(Client):
     def __server_transaction_check_callback(self, future, message, 
transaction_id, result):
         try:
             res = future.result()
-            if res is not None and res.status.code == Code.OK:
+            if res and res.status.code == Code.OK:
                 if result == TransactionResolution.COMMIT:
                     logger.debug(
                         f"{self} commit message. message_id: 
{message.message_id}, transaction_id: {transaction_id}, res: {res}"
diff --git a/python/rocketmq/v5/util/messaging_result_checker.py 
b/python/rocketmq/v5/util/messaging_result_checker.py
index 4091397a..f294be16 100644
--- a/python/rocketmq/v5/util/messaging_result_checker.py
+++ b/python/rocketmq/v5/util/messaging_result_checker.py
@@ -15,7 +15,11 @@
 
 from rocketmq.grpc_protocol import Code, Status
 from rocketmq.v5.exception import (BadRequestException, ForbiddenException,
-                                   InternalErrorException, NotFoundException,
+                                   IllegalStateException,
+                                   InternalErrorException,
+                                   LiteSubscriptionQuotaExceededException,
+                                   LiteTopicQuotaExceededException,
+                                   NotFoundException, PayloadEmptyException,
                                    PayloadTooLargeException,
                                    PaymentRequiredException,
                                    ProxyTimeoutException,
@@ -29,6 +33,9 @@ class MessagingResultChecker:
 
     @staticmethod
     def check(status: Status):
+        if not status or not status.code:
+            raise IllegalStateException("response is illegal.")
+
         code = status.code
         message = status.message
 
@@ -42,6 +49,7 @@ class MessagingResultChecker:
             or code == Code.ILLEGAL_MESSAGE_TAG
             or code == Code.ILLEGAL_MESSAGE_KEY
             or code == Code.ILLEGAL_MESSAGE_GROUP
+            or code == Code.ILLEGAL_LITE_TOPIC
             or code == Code.ILLEGAL_MESSAGE_PROPERTY_KEY
             or code == Code.INVALID_TRANSACTION_ID
             or code == Code.ILLEGAL_MESSAGE_ID
@@ -72,8 +80,14 @@ class MessagingResultChecker:
             raise NotFoundException(message, code)
         elif code == Code.PAYLOAD_TOO_LARGE or code == 
Code.MESSAGE_BODY_TOO_LARGE:
             raise PayloadTooLargeException(message, code)
+        elif code == Code.MESSAGE_BODY_EMPTY:
+            raise PayloadEmptyException(message, code)
         elif code == Code.TOO_MANY_REQUESTS:
             raise TooManyRequestsException(message, code)
+        elif code == Code.LITE_TOPIC_QUOTA_EXCEEDED:
+            raise LiteTopicQuotaExceededException(message, code)
+        elif code == Code.LITE_SUBSCRIPTION_QUOTA_EXCEEDED:
+            raise LiteSubscriptionQuotaExceededException(message, code)
         elif (
             code == Code.REQUEST_HEADER_FIELDS_TOO_LARGE
             or code == Code.MESSAGE_PROPERTIES_TOO_LARGE
diff --git a/python/rocketmq/v5/util/misc.py b/python/rocketmq/v5/util/misc.py
index be3e5d3d..a377ab4f 100644
--- a/python/rocketmq/v5/util/misc.py
+++ b/python/rocketmq/v5/util/misc.py
@@ -31,7 +31,7 @@ class Misc:
     __OS_NAME = None
     TOPIC_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
     CONSUMER_GROUP_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
-    SDK_VERSION = "5.0.9"
+    SDK_VERSION = "5.1.0"
 
     @staticmethod
     def sdk_language():

Reply via email to