herywang opened a new issue, #951: URL: https://github.com/apache/rocketmq-clients/issues/951
### Before Creating the Bug Report - [x] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions). - [x] I have searched the [GitHub Issues](https://github.com/apache/rocketmq-clients/issues) and [GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions) of this repository and believe that this is not a duplicate. - [x] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Programming Language of the Client Python ### Runtime Platform Environment Ubuntu 20.04 - rocketserver MacOS 15.3.1 - python-sdk-client Python - 3.9.19 ### RocketMQ Version of the Client/Server client: rocketmq-python-client==5.0.3 server: rocketmq 5.1.0 ### Run or Compiler Version openjdk 1.8 ### Describe the Bug Hello guys! I found a bug (or maybe I misunderstood). When I use SimpleConsumer python-sdk to consume messages from RocketMQ, and call `change_invisible_duration` to extend the message's invisibility duration during consumption, the message is added to the %RETRY% Topic. Business Scenario: The duration of consumption tasks varies greatly. I use SimpleConsumer in pull mode. During consumption, a background thread uses change_invisible_duration to extend the invisibility duration. Version Number: rocketmq-python-client==5.0.3 According to the official documentation's description, extending the invisibility duration should keep the current message in the current topic, not move it to the retry topic. Is my understanding incorrect? ### Steps to Reproduce 1. 定义一个消费者, 从某个主题接受消息 2. 接收到消息之后, 将该消息放到一个集合中, 后台哨兵线程每10s续期一次不可见时间30s 3. 当执行续期之后, 观察%RETRY% topic, 发现 被续期的这个message 进入到了这个重试Topic. 导致的直接结果: 消费者会一直存在重复消费, 即使正常队列中的消息消费完成, 还会再重复消费重试队列中的消息. 下面三张图片是实验复现路径, 图片后面附着了代码 <img width="1597" alt="Image" src="https://github.com/user-attachments/assets/fae71960-b7c8-40c6-831a-2465f01ec5c3" /> <img width="1228" alt="Image" src="https://github.com/user-attachments/assets/7a313d6b-ddac-4d9d-9b73-8e7f374309b6" /> <img width="1675" alt="Image" src="https://github.com/user-attachments/assets/36d6a86d-26d2-442f-9f63-419cdb297760" /> 下面是案例代码: ```python def content_rewrite_consumer_executor(): """ 启动内容改写Consumer后台线程 """ ak = config.ROCKETMQ_CONFIG.access_key if config.ROCKETMQ_CONFIG.access_key is not None else "" sk = config.ROCKETMQ_CONFIG.secret_key if config.ROCKETMQ_CONFIG.secret_key is not None else "" credentials = Credentials(ak, sk) rocket_conf = ClientConfiguration(config.ROCKETMQ_CONFIG.endpoints, credentials) consumer_group = config.ROCKETMQ_CONFIG.consumer_group try: global __consumer__ __consumer__ = SimpleConsumer(rocket_conf, consumer_group) __consumer__.startup() __consumer__.subscribe(config.ROCKETMQ_CONFIG.content_rewrite_topic()) except Exception as e: print(f"内容改写消费者启动失败!程序退出!失败原因: {e}") exit(-1) logger.info(f"内容改写Simple消费者启动完成,订阅主题:{config.ROCKETMQ_CONFIG.content_rewrite_topic()}") logger.info(f"开始启动改写任务监听线程") __start_rewritten_task_monitor_thread__() while True: try: messages: List[Message] = __consumer__.receive(10, 30) logger.info(f"接收到消息数量:{len(messages)}") if messages is None or len(messages) == 0: continue print(f"{__consumer__.__str__()} receive {len(messages)} messages.") with __lock__: for msg in messages: msg_id = msg.message_id else: # 将任务添加到list中 __inflight_task_map__[msg_id] = msg # 提交改写任务到线程池, 线程池执行完成之后, 从__inflight_task_map__中删除, 并且发送consumer.ack(msg) __executor__.submit(rewrite_task, msg) consumer_cache[msg_id] = True # 如果任务数大于10,等待,不要再拉取改写任务了。 while len(__inflight_task_map__) >= 10: time.sleep(2) except Exception as e: print(f"receive message raise exception: {e}") def __rewritten_task_monitor_thread__(): logger.info(f"哨兵监控线程启动成功!") """ 后台线程,轮训正在执行的任务列表,续期不可见时间 """ global __consumer__ while True: logger.info("当前任务数 %d, 开始执行不可见时间续期...", len(__inflight_task_map__)) if __consumer__ is not None: with __lock__: for k, v in __inflight_task_map__.items(): __consumer__.change_invisible_duration(v, 10) print(f"续期10秒,messageId:{v.message_id}") time.sleep(5) ``` ### What Did You Expect to See? 解决重复消费问题. ### What Did You See Instead? - ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
