herywang opened a new issue, #951:
URL: https://github.com/apache/rocketmq-clients/issues/951

   ### Before Creating the Bug Report
   
   - [x] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions).
   
   - [x] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq-clients/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq-clients/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [x] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Programming Language of the Client
   
   Python
   
   ### Runtime Platform Environment
   
   Ubuntu 20.04  - rocketserver
   MacOS 15.3.1 - python-sdk-client
   Python - 3.9.19
   
   ### RocketMQ Version of the Client/Server
   
   client: rocketmq-python-client==5.0.3
   server: rocketmq 5.1.0
   
   ### Run or Compiler Version
   
   openjdk 1.8
   
   ### Describe the Bug
   
   Hello guys!
   
   I found a bug (or maybe I misunderstood). When I use SimpleConsumer 
python-sdk to consume messages from RocketMQ, and call 
`change_invisible_duration` to extend the message's invisibility duration 
during consumption, the message is added to the %RETRY% Topic.
   
   Business Scenario: The duration of consumption tasks varies greatly. I use 
SimpleConsumer in pull mode. During consumption, a background thread uses 
change_invisible_duration to extend the invisibility duration.
   
   Version Number: rocketmq-python-client==5.0.3
   
   According to the official documentation's description, extending the 
invisibility duration should keep the current message in the current topic, not 
move it to the retry topic. Is my understanding incorrect?
   
   ### Steps to Reproduce
   
   1. 定义一个消费者, 从某个主题接受消息
   2. 接收到消息之后, 将该消息放到一个集合中, 后台哨兵线程每10s续期一次不可见时间30s
   3. 当执行续期之后, 观察%RETRY% topic, 发现 被续期的这个message 进入到了这个重试Topic.
   
   导致的直接结果: 消费者会一直存在重复消费, 即使正常队列中的消息消费完成, 还会再重复消费重试队列中的消息.
   
   下面三张图片是实验复现路径, 图片后面附着了代码
   
   <img width="1597" alt="Image" 
src="https://github.com/user-attachments/assets/fae71960-b7c8-40c6-831a-2465f01ec5c3";
 />
   
   <img width="1228" alt="Image" 
src="https://github.com/user-attachments/assets/7a313d6b-ddac-4d9d-9b73-8e7f374309b6";
 />
   
   <img width="1675" alt="Image" 
src="https://github.com/user-attachments/assets/36d6a86d-26d2-442f-9f63-419cdb297760";
 />
   
   下面是案例代码:
   ```python
   def content_rewrite_consumer_executor():
       """
       启动内容改写Consumer后台线程
       """
       ak = config.ROCKETMQ_CONFIG.access_key if 
config.ROCKETMQ_CONFIG.access_key is not None else ""
       sk = config.ROCKETMQ_CONFIG.secret_key if 
config.ROCKETMQ_CONFIG.secret_key is not None else ""
       credentials = Credentials(ak, sk)
       rocket_conf = ClientConfiguration(config.ROCKETMQ_CONFIG.endpoints, 
credentials)
       consumer_group = config.ROCKETMQ_CONFIG.consumer_group
       try:
           global __consumer__
           __consumer__ = SimpleConsumer(rocket_conf, consumer_group)
           __consumer__.startup()
           
__consumer__.subscribe(config.ROCKETMQ_CONFIG.content_rewrite_topic())
       except Exception as e:
           print(f"内容改写消费者启动失败!程序退出!失败原因: {e}")
           exit(-1)
   
       
logger.info(f"内容改写Simple消费者启动完成,订阅主题:{config.ROCKETMQ_CONFIG.content_rewrite_topic()}")
       logger.info(f"开始启动改写任务监听线程")
       __start_rewritten_task_monitor_thread__()
       while True:
           try:
               messages: List[Message] = __consumer__.receive(10, 30)
               logger.info(f"接收到消息数量:{len(messages)}")
               if messages is None or len(messages) == 0:
                   continue
   
               print(f"{__consumer__.__str__()} receive {len(messages)} 
messages.")
               with __lock__:
                   for msg in messages:
                       msg_id = msg.message_id
                       else:
                           # 将任务添加到list中
                           __inflight_task_map__[msg_id] = msg
                           # 提交改写任务到线程池, 线程池执行完成之后, 从__inflight_task_map__中删除, 
并且发送consumer.ack(msg)
                           __executor__.submit(rewrite_task, msg)
   
                           consumer_cache[msg_id] = True
   
               # 如果任务数大于10,等待,不要再拉取改写任务了。
               while len(__inflight_task_map__) >= 10:
                   time.sleep(2)
   
           except Exception as e:
               print(f"receive message raise exception: {e}")
   
   def __rewritten_task_monitor_thread__():
       logger.info(f"哨兵监控线程启动成功!")
       """
       后台线程,轮训正在执行的任务列表,续期不可见时间
       """
       global __consumer__
       while True:
           logger.info("当前任务数 %d,  开始执行不可见时间续期...", len(__inflight_task_map__))
           if __consumer__ is not None:
               with __lock__:
                   for k, v in __inflight_task_map__.items():
                       __consumer__.change_invisible_duration(v, 10)
                       print(f"续期10秒,messageId:{v.message_id}")
           time.sleep(5)
   
   ```
   
   ### What Did You Expect to See?
   
   解决重复消费问题.
   
   ### What Did You See Instead?
   
   -
   
   ### Additional Context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to