deku0818 opened a new issue, #1198: URL: https://github.com/apache/rocketmq-clients/issues/1198
### Before Creating the Bug Report - [x] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions). - [x] I have searched the [GitHub Issues](https://github.com/apache/rocketmq-clients/issues) and [GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions) of this repository and believe that this is not a duplicate. - [x] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Programming Language of the Client Python ### Runtime Platform Environment OS: Ubuntu 24.04 (Linux 6.14.0-1018-aws x86_64) ### RocketMQ Version of the Client/Server Client: rocketmq-python-client 5.1.1 Server: RocketMQ 5.x (proxy mode, endpoint port 8081) ### Run or Compiler Version Runtime: Python 3.11.10 OS: Ubuntu 24.04 LTS ### Describe the Bug `PushConsumer.__receive()` calls `self._receive_req()` without passing `invisible_duration`, even though the parent `Consumer._receive_req()` method supports it. This causes the receive request to be sent to the broker with no `invisible_duration` set, relying entirely on `auto_renew=True`. When the broker's auto-renew mechanism does not work reliably, the receipt handle expires before the message is acknowledged, resulting in **duplicate message delivery during processing**. The problematic code is in `rocketmq/v5/consumer/push/push_consumer.py` line 247-248: ```python def __receive(self, message_queue, process_queue, attempt_id): try: max_message_num = process_queue.max_receive_batch_size(...) req = self._receive_req(message_queue.topic, message_queue, max_message_num, True, long_polling_timeout=self.__long_polling_timeout, attempt_id=attempt_id) # ^^^ invisible_duration is NOT passed here ``` While `Consumer._receive_req()` already supports the parameter: ```python def _receive_req(self, topic, queue, max_message_num, auto_renew, invisible_duration=None, long_polling_timeout=None, attempt_id=None): ... if invisible_duration: req.invisible_duration.seconds = invisible_duration ``` ### Steps to Reproduce 1. Create a `PushConsumer` that subscribes to a topic 2. Send a message to that topic 3. In the `MessageListener`, perform a long-running task (e.g., 1-2 minutes) 4. Observe that the same message is delivered again **while the first invocation is still processing** ### What Did You Expect to See? Each message should be delivered only once. During processing, the message should remain invisible to other consumers until the listener returns and ACK/NACK is sent. Either: - `auto_renew` should reliably extend the invisible duration, OR - `PushConsumer.__receive()` should allow users to configure `invisible_duration` as a safety net ### What Did You See Instead? The same message (`thread_id=qa-9939ca1b`) was delivered 3 times within ~3 minutes while the first invocation was still processing: ``` 04:51:25 | INFO | 收到 worker 消息, thread_id=qa-9939ca1b ← first delivery (correct) 04:52:45 | INFO | 收到 worker 消息, thread_id=qa-9939ca1b ← duplicate (80s later, still processing) 04:53:08 | INFO | 消息处理成功, thread_id=qa-9939ca1b ← first one finishes 04:54:05 | INFO | 收到 worker 消息, thread_id=qa-9939ca1b ← duplicate again ``` ### Additional Context **Workaround**: Subclass `PushConsumer` and override `__receive` via Python name-mangling to inject `invisible_duration`: ```python class SafePushConsumer(PushConsumer): def __init__(self, *args, invisible_duration: int = 300, **kwargs): super().__init__(*args, **kwargs) self._invisible_duration = invisible_duration def _PushConsumer__receive(self, message_queue, process_queue, attempt_id): max_message_num = process_queue.max_receive_batch_size( self._PushConsumer__queue_threshold(self._PushConsumer__max_cache_message_count), self._PushConsumer__receive_batch_size, ) req = self._receive_req( message_queue.topic, message_queue, max_message_num, True, invisible_duration=self._invisible_duration, long_polling_timeout=self._PushConsumer__long_polling_timeout, attempt_id=attempt_id, ) return self._receive( message_queue, req, self._PushConsumer__long_polling_timeout + self.client_configuration.request_timeout, ) ``` **Suggested fix**: Add an `invisible_duration` parameter to `PushConsumer.__init__()` and pass it through in `__receive()`. This is a one-line change in the SDK. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
