deku0818 opened a new issue, #1198:
URL: https://github.com/apache/rocketmq-clients/issues/1198

   ### Before Creating the Bug Report
   
   - [x] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions).
   
   - [x] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq-clients/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq-clients/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [x] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Programming Language of the Client
   
   Python
   
   ### Runtime Platform Environment
   
   OS: Ubuntu 24.04 (Linux 6.14.0-1018-aws x86_64)
   
   ### RocketMQ Version of the Client/Server
   
   Client: rocketmq-python-client 5.1.1
   Server: RocketMQ 5.x (proxy mode, endpoint port 8081)
   
   ### Run or Compiler Version
   
   Runtime: Python 3.11.10
   OS: Ubuntu 24.04 LTS
   
   ### Describe the Bug
   
   `PushConsumer.__receive()` calls `self._receive_req()` without passing 
`invisible_duration`, even though the parent `Consumer._receive_req()` method 
supports it. This causes the receive request to be sent to the broker with no 
`invisible_duration` set, relying entirely on `auto_renew=True`. When the 
broker's auto-renew mechanism does not work reliably, the receipt handle 
expires before the message is acknowledged, resulting in **duplicate message 
delivery during processing**.
   
   The problematic code is in `rocketmq/v5/consumer/push/push_consumer.py` line 
247-248:
   
   ```python
   def __receive(self, message_queue, process_queue, attempt_id):
       try:
           max_message_num = process_queue.max_receive_batch_size(...)
           req = self._receive_req(message_queue.topic, message_queue, 
max_message_num, True,
                                   
long_polling_timeout=self.__long_polling_timeout, attempt_id=attempt_id)
           # ^^^ invisible_duration is NOT passed here
   ```
   
   While `Consumer._receive_req()` already supports the parameter:
   
   ```python
   def _receive_req(self, topic, queue, max_message_num, auto_renew,
                    invisible_duration=None, long_polling_timeout=None, 
attempt_id=None):
       ...
       if invisible_duration:
           req.invisible_duration.seconds = invisible_duration
   ```
   
   ### Steps to Reproduce
   
   1. Create a `PushConsumer` that subscribes to a topic
   2. Send a message to that topic
   3. In the `MessageListener`, perform a long-running task (e.g., 1-2 minutes)
   4. Observe that the same message is delivered again **while the first 
invocation is still processing**
   
   ### What Did You Expect to See?
   
   Each message should be delivered only once. During processing, the message 
should remain invisible to other consumers until the listener returns and 
ACK/NACK is sent. Either:
   
   - `auto_renew` should reliably extend the invisible duration, OR
   - `PushConsumer.__receive()` should allow users to configure 
`invisible_duration` as a safety net
   
   
   ### What Did You See Instead?
   
   The same message (`thread_id=qa-9939ca1b`) was delivered 3 times within ~3 
minutes while the first invocation was still processing:
   
   ```
   04:51:25 | INFO | 收到 worker 消息, thread_id=qa-9939ca1b   ← first delivery 
(correct)
   04:52:45 | INFO | 收到 worker 消息, thread_id=qa-9939ca1b   ← duplicate (80s 
later, still processing)
   04:53:08 | INFO | 消息处理成功, thread_id=qa-9939ca1b       ← first one finishes
   04:54:05 | INFO | 收到 worker 消息, thread_id=qa-9939ca1b   ← duplicate again
   ```
   
   ### Additional Context
   
   **Workaround**: Subclass `PushConsumer` and override `__receive` via Python 
name-mangling to inject `invisible_duration`:
   
   ```python
   class SafePushConsumer(PushConsumer):
       def __init__(self, *args, invisible_duration: int = 300, **kwargs):
           super().__init__(*args, **kwargs)
           self._invisible_duration = invisible_duration
   
       def _PushConsumer__receive(self, message_queue, process_queue, 
attempt_id):
           max_message_num = process_queue.max_receive_batch_size(
               
self._PushConsumer__queue_threshold(self._PushConsumer__max_cache_message_count),
               self._PushConsumer__receive_batch_size,
           )
           req = self._receive_req(
               message_queue.topic, message_queue, max_message_num, True,
               invisible_duration=self._invisible_duration,
               long_polling_timeout=self._PushConsumer__long_polling_timeout,
               attempt_id=attempt_id,
           )
           return self._receive(
               message_queue, req,
               self._PushConsumer__long_polling_timeout + 
self.client_configuration.request_timeout,
           )
   ```
   
   **Suggested fix**: Add an `invisible_duration` parameter to 
`PushConsumer.__init__()` and pass it through in `__receive()`. This is a 
one-line change in the SDK.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to